rpc: event websocket subscription (#308)

* rpc: event websocket subscription

* rpc: use tendermint event subscriptions

* new log events

* filter evm transactions

* filter logs

* wip: refactor filters

* remove custom BlockNumber

* wip: refactor rpc

* HeaderByNumber and HeaderByHash

* update Tendermint event system

* update Filter

* update EventSystem

* fix lint issues

* update rpc filters

* upgrade to tendermint v0.33.4

* update filters

* fix unsubscription

* updates wip

* initialize channels

* cleanup go routines

* pass ResultEvent channel on subscription

* error channel

* add block filter changes test

* add eventCh loop

* pass funcs in select go func, block filter working

* cleanup

* lint

* NewFilter and GetFilterChanges working

* eth_getLogs working

* lint

* lint

* cleanup

* remove logs and minor fixes

* changelog

* address @noot comments

* revert BlockNumber removal

Co-authored-by: noot <elizabethjbinks@gmail.com>
This commit is contained in:
Federico Kunze 2020-07-03 17:40:00 +02:00 committed by GitHub
parent c8b8f49675
commit 20e9b2ede3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1304 additions and 334 deletions

View File

@ -65,8 +65,9 @@ Ref: https://keepachangelog.com/en/1.0.0/
### Features
* (rpc) [\#231](https://github.com/ChainSafe/ethermint/issues/231) Implement NewBlockFilter in rpc/filters.go which instantiates a polling block filter
* Polls for new blocks via BlockNumber rpc call; if block number changes, it requests the new block via GetBlockByNumber rpc call and adds it to its internal list of blocks
* (rpc) [\#330](https://github.com/ChainSafe/ethermint/issues/330) Implement `PublicFilterAPI`'s `EventSystem` which subscribes to Tendermint events upon `Filter` creation.
* (rpc) [\#231](https://github.com/ChainSafe/ethermint/issues/231) Implement `NewBlockFilter` in rpc/filters.go which instantiates a polling block filter
* Polls for new blocks via `BlockNumber` rpc call; if block number changes, it requests the new block via `GetBlockByNumber` rpc call and adds it to its internal list of blocks
* Update uninstallFilter and getFilterChanges accordingly
* uninstallFilter stops the polling goroutine
* getFilterChanges returns the filter's internal list of block hashes and resets it

10
go.mod
View File

@ -5,7 +5,6 @@ go 1.14
require (
github.com/allegro/bigcache v1.2.1 // indirect
github.com/aristanetworks/goarista v0.0.0-20200331225509-2cc472e8fbd6 // indirect
github.com/btcsuite/btcd v0.20.1-beta // indirect
github.com/cespare/cp v1.1.1 // indirect
github.com/cosmos/cosmos-sdk v0.34.4-0.20200403200637-7f78e61b93a5
github.com/deckarep/golang-set v1.7.1 // indirect
@ -22,15 +21,16 @@ require (
github.com/regen-network/cosmos-proto v0.1.1-0.20200213154359-02baa11ea7c2
github.com/rjeczalik/notify v0.9.2 // indirect
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/cobra v0.0.7
github.com/spf13/cobra v1.0.0
github.com/spf13/viper v1.7.0
github.com/status-im/keycard-go v0.0.0-20190424133014-d95853db0f48 // indirect
github.com/stretchr/testify v1.6.1
github.com/tendermint/go-amino v0.15.1
github.com/tendermint/tendermint v0.33.3
github.com/tendermint/tendermint v0.33.4
github.com/tendermint/tm-db v0.5.1
golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4
golang.org/x/crypto v0.0.0-20200406173513-056763e48d71
gopkg.in/yaml.v2 v2.3.0
)
replace github.com/cosmos/cosmos-sdk => github.com/cosmos/cosmos-sdk v0.34.4-0.20200403200637-7f78e61b93a5
// forked SDK to avoid breaking changes
replace github.com/cosmos/cosmos-sdk => github.com/Chainsafe/cosmos-sdk v0.34.4-0.20200622114457-35ea97f29c5f

46
go.sum
View File

@ -31,6 +31,10 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/ChainSafe/go-schnorrkel v0.0.0-20200102211924-4bcbc698314f h1:4O1om+UVU+Hfcihr1timk8YNXHxzZWgCo7ofnrZRApw=
github.com/ChainSafe/go-schnorrkel v0.0.0-20200102211924-4bcbc698314f/go.mod h1:URdX5+vg25ts3aCh8H5IFZybJYKWhJHYMTnf+ULtoC4=
github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d h1:nalkkPQcITbvhmL4+C4cKA87NW0tfm3Kl9VXRoPywFg=
github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d/go.mod h1:URdX5+vg25ts3aCh8H5IFZybJYKWhJHYMTnf+ULtoC4=
github.com/Chainsafe/cosmos-sdk v0.34.4-0.20200622114457-35ea97f29c5f h1:hLvatKcr7PZPWlwBb08oSxdfd7bN5JT0d3MKIwm3zEk=
github.com/Chainsafe/cosmos-sdk v0.34.4-0.20200622114457-35ea97f29c5f/go.mod h1:brXC4wuGawcC5pQebaWER22hzunmXFLgN8vajUh+xhE=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
@ -42,6 +46,7 @@ github.com/VictoriaMetrics/fastcache v1.5.7 h1:4y6y0G8PRzszQUYIQHHssv/jgPHAb5qQu
github.com/VictoriaMetrics/fastcache v1.5.7/go.mod h1:ptDBkNMQI4RtmVo8VS/XwRY6RoTu1dAWCbrk+6WsEM8=
github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/Workiva/go-datastructures v1.0.52 h1:PLSK6pwn8mYdaoaCZEMsXBpBotr4HHn9abU0yMQt0NI=
github.com/Workiva/go-datastructures v1.0.52/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
@ -84,6 +89,8 @@ github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufo
github.com/btcsuite/btcutil v0.0.0-20180706230648-ab6388e0c60a/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d h1:yJzD/yFppdVCf6ApMkVy8cUxV0XrxdP9rVf6D87/Mng=
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/btcutil v1.0.2 h1:9iZ1Terx9fMIOtq1VrwdqfsATL9MC2l8ZrUY6YZ2uts=
github.com/btcsuite/btcutil v1.0.2/go.mod h1:j9HUFwoQRsZL3V4n+qG+CUnEGHOarIxfC3Le2Yhbcts=
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg=
github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY=
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
@ -114,8 +121,6 @@ github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cosmos/cosmos-sdk v0.34.4-0.20200403200637-7f78e61b93a5 h1:Up28KmvitVSSms5m+JZUrfYjVF27LvXZVfTb+408HaM=
github.com/cosmos/cosmos-sdk v0.34.4-0.20200403200637-7f78e61b93a5/go.mod h1:J2RTB23kBgFKwtKd7J/gk4WwG363cA/xM0GU1Gfztw4=
github.com/cosmos/go-bip39 v0.0.0-20180819234021-555e2067c45d h1:49RLWk1j44Xu4fjHb6JFYmeUnDORVwHNkDxaQ0ctCVU=
github.com/cosmos/go-bip39 v0.0.0-20180819234021-555e2067c45d/go.mod h1:tSxLoYXyBmiFeKpvmq4dzayMdCjCnu8uqmCysIGBT2Y=
github.com/cosmos/ledger-cosmos-go v0.11.1 h1:9JIYsGnXP613pb2vPjFeMMjBI5lEDsEaF6oYorTy6J4=
@ -228,8 +233,9 @@ github.com/golang/protobuf v1.3.4/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaW
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4 h1:+EOh4OY6tjM6ZueeUKinl1f0U2820HzQOuf1iqMnsks=
github.com/golang/protobuf v1.4.0-rc.4/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0 h1:oOuy+ugB+P/kBdUnG5QaMXSIyJ1q38wWSojYCb3z5VQ=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
@ -277,6 +283,8 @@ github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/gtank/merlin v0.1.1-0.20191105220539-8318aed1a79f h1:8N8XWLZelZNibkhM1FuF+3Ad3YIbgirjdMiVA0eUkaM=
github.com/gtank/merlin v0.1.1-0.20191105220539-8318aed1a79f/go.mod h1:T86dnYJhcGOh5BjZFCJWTDeTK7XW8uE+E21Cy/bIQ+s=
github.com/gtank/merlin v0.1.1 h1:eQ90iG7K9pOhtereWsmyRJ6RAwcP4tHTDBHXNg+u5is=
github.com/gtank/merlin v0.1.1/go.mod h1:T86dnYJhcGOh5BjZFCJWTDeTK7XW8uE+E21Cy/bIQ+s=
github.com/gtank/ristretto255 v0.1.2 h1:JEqUCPA1NvLq5DwYtuzigd7ss8fwbYay9fi4/5uMzcc=
github.com/gtank/ristretto255 v0.1.2/go.mod h1:Ph5OpO6c7xKUGROZfWVLiJf9icMDwUeIvY4OmlYW69o=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
@ -472,6 +480,8 @@ github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeD
github.com/prometheus/client_golang v1.4.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.5.0 h1:Ctq0iGpCmr3jeP77kbF2UxgvRwzWWz+4Bh9/vJTyg1A=
github.com/prometheus/client_golang v1.5.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.5.1 h1:bdHYieyGlH+6OLEk2YQha8THib30KP0/yD0YH9m6xcA=
github.com/prometheus/client_golang v1.5.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@ -502,6 +512,8 @@ github.com/rakyll/statik v0.1.7/go.mod h1:AlZONWzMtEnMs7W4e/1LURLiI49pIMmp6V9Ung
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ=
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/regen-network/cosmos-proto v0.1.1-0.20200213154359-02baa11ea7c2 h1:jQK1YoH972Aptd22YKgtNu5jM2X2xMGkyIENOAc71to=
github.com/regen-network/cosmos-proto v0.1.1-0.20200213154359-02baa11ea7c2/go.mod h1:+r7jN10xXCypD4yBgzKOa+vgLz0riqYMHeDcKekxPvA=
github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho=
@ -539,8 +551,8 @@ github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/cobra v0.0.6/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
github.com/spf13/cobra v0.0.7 h1:FfTH+vuMXOas8jmfb5/M7dzEYx7LpcLb7a0LPe34uOU=
github.com/spf13/cobra v0.0.7/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
github.com/spf13/cobra v1.0.0 h1:6m/oheQuQ13N9ks4hubMG6BnvwOeaJrqSPLahSnczz8=
github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk=
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
@ -550,6 +562,7 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/spf13/viper v1.6.2/go.mod h1:t3iDnF5Jlj76alVNuyFBk5oUMCvsrkbvZK0WQdfDi5k=
github.com/spf13/viper v1.6.3/go.mod h1:jUMtyi0/lB5yZH/FjyGAoH7IMNrIhlBf6pXZmbMDvzw=
github.com/spf13/viper v1.7.0 h1:xVKxvI7ouOI5I+U9s2eeiUfMaWBVoXA3AWskkrqK0VM=
github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=
@ -571,8 +584,6 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.0 h1:jlIyCplCJFULU/01vCkhKuTyc3OorI3bJFuw6obfgho=
github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
@ -590,13 +601,12 @@ github.com/tendermint/crypto v0.0.0-20191022145703-50d29ede1e15/go.mod h1:z4YtwM
github.com/tendermint/go-amino v0.14.1/go.mod h1:i/UKE5Uocn+argJJBb12qTZsCDBcAYMbR92AaJVmKso=
github.com/tendermint/go-amino v0.15.1 h1:D2uk35eT4iTsvJd9jWIetzthE5C0/k2QmMFkCN+4JgQ=
github.com/tendermint/go-amino v0.15.1/go.mod h1:TQU0M1i/ImAo+tYpZi73AU3V/dKeCoMC9Sphe2ZwGME=
github.com/tendermint/iavl v0.13.2 h1:O1m08/Ciy53l9IYmf75uIRVvrNsfjEbre8u/yCu/oqk=
github.com/tendermint/iavl v0.13.2/go.mod h1:vE1u0XAGXYjHykd4BLp8p/yivrw2PF1TuoljBcsQoGA=
github.com/tendermint/iavl v0.13.3 h1:expgBDY1MX+6/3sqrIxGChbTNf9N9aTJ67SH4bPchCs=
github.com/tendermint/iavl v0.13.3/go.mod h1:2lE7GiWdSvc7kvT78ncIKmkOjCnp6JEnSb2O7B9htLw=
github.com/tendermint/tendermint v0.33.2/go.mod h1:25DqB7YvV1tN3tHsjWoc2vFtlwICfrub9XO6UBO+4xk=
github.com/tendermint/tendermint v0.33.3 h1:6lMqjEoCGejCzAghbvfQgmw87snGSqEhDTo/jw+W8CI=
github.com/tendermint/tendermint v0.33.3/go.mod h1:25DqB7YvV1tN3tHsjWoc2vFtlwICfrub9XO6UBO+4xk=
github.com/tendermint/tendermint v0.33.4 h1:NM3G9618yC5PaaxGrcAySc5ylc1PAANeIx42u2Re/jo=
github.com/tendermint/tendermint v0.33.4/go.mod h1:6NW9DVkvsvqmCY6wbRsOo66qGDhMXglRL70aXajvBEA=
github.com/tendermint/tm-db v0.4.1/go.mod h1:JsJ6qzYkCGiGwm5GHl/H5GLI9XLb6qZX7PRe425dHAY=
github.com/tendermint/tm-db v0.5.0/go.mod h1:lSq7q5WRR/njf1LnhiZ/lIJHk2S8Y1Zyq5oP/3o9C2U=
github.com/tendermint/tm-db v0.5.1 h1:H9HDq8UEA7Eeg13kdYckkgwwkQLBnJGgX4PgLJRhieY=
github.com/tendermint/tm-db v0.5.1/go.mod h1:g92zWjHpCYlEvQXvy9M168Su8V1IBEeawpXVVBaK4f4=
github.com/tjfoc/gmsm v1.3.0/go.mod h1:HaUcFuY0auTiaHB9MHFGCPx5IaLhTUd2atbCFBQXn9w=
@ -644,10 +654,13 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20191219195013-becbf705a915/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4 h1:QmwruyY+bKbDDL0BaglrbZABEali68eoMFhTZpCjYVA=
golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200406173513-056763e48d71 h1:DOmugCavvUtnUD114C1Wh+UgTgQZ4pMLzXxi1pSt+/Y=
golang.org/x/crypto v0.0.0-20200406173513-056763e48d71/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@ -690,6 +703,7 @@ golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@ -804,12 +818,14 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8
google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.28.0 h1:bO/TA4OxCOummhSf10siHuG7vJOiwh7SpRpFZDkOgl4=
google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60=
google.golang.org/grpc v1.28.1 h1:C1QC6KzgSiLyBabDi87BbjaGreoRgGUF5nOyvfrAZ1k=
google.golang.org/grpc v1.28.1/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.20.1 h1:ESRXHgpUBG5D2I5mmsQIyYxB/tQIZfSZ8wLyFDf/N/U=
google.golang.org/protobuf v1.20.1/go.mod h1:KqelGeouBkcbcuB3HCk4/YH2tmNLk6YSWA5LIWeI/lY=
google.golang.org/protobuf v1.21.0 h1:qdOKuR/EIArgaWNjetjgTzgVTAZ+S/WXVrq9HW9zimw=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/bsm/ratelimit.v1 v1.0.0-20160220154919-db14e161995a/go.mod h1:KF9sEfUPAXdG8Oev9e99iLGnl2uJMjc5B+4y3O7x610=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@ -10,10 +10,15 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)
const Web3Namespace = "web3"
const EthNamespace = "eth"
const PersonalNamespace = "personal"
const NetNamespace = "net"
// RPC namespaces and API version
const (
Web3Namespace = "web3"
EthNamespace = "eth"
PersonalNamespace = "personal"
NetNamespace = "net"
apiVersion = "1.0"
)
// GetRPCAPIs returns the list of all APIs
func GetRPCAPIs(cliCtx context.CLIContext, key emintcrypto.PrivKeySecp256k1) []rpc.API {
@ -22,31 +27,31 @@ func GetRPCAPIs(cliCtx context.CLIContext, key emintcrypto.PrivKeySecp256k1) []r
return []rpc.API{
{
Namespace: Web3Namespace,
Version: "1.0",
Version: apiVersion,
Service: NewPublicWeb3API(),
Public: true,
},
{
Namespace: EthNamespace,
Version: "1.0",
Version: apiVersion,
Service: NewPublicEthAPI(cliCtx, backend, nonceLock, key),
Public: true,
},
{
Namespace: PersonalNamespace,
Version: "1.0",
Version: apiVersion,
Service: NewPersonalEthAPI(cliCtx, nonceLock),
Public: false,
},
{
Namespace: EthNamespace,
Version: "1.0",
Version: apiVersion,
Service: NewPublicFilterAPI(cliCtx, backend),
Public: true,
},
{
Namespace: NetNamespace,
Version: "1.0",
Version: apiVersion,
Service: NewPublicNetAPI(cliCtx),
Public: true,
},

View File

@ -5,6 +5,8 @@ import (
"math/big"
"strconv"
tmtypes "github.com/tendermint/tendermint/types"
evmtypes "github.com/cosmos/ethermint/x/evm/types"
"github.com/cosmos/cosmos-sdk/client/context"
@ -19,20 +21,26 @@ import (
type Backend interface {
// Used by block filter; also used for polling
BlockNumber() (hexutil.Uint64, error)
HeaderByNumber(blockNum BlockNumber) (*ethtypes.Header, error)
HeaderByHash(blockHash common.Hash) (*ethtypes.Header, error)
GetBlockByNumber(blockNum BlockNumber, fullTx bool) (map[string]interface{}, error)
GetBlockByHash(hash common.Hash, fullTx bool) (map[string]interface{}, error)
getEthBlockByNumber(height int64, fullTx bool) (map[string]interface{}, error)
getGasLimit() (int64, error)
// returns the logs of a given block
GetLogs(blockHash common.Hash) ([][]*ethtypes.Log, error)
// Used by pending transaction filter
PendingTransactions() ([]*Transaction, error)
// Used by log filter
GetTransactionLogs(txHash common.Hash) ([]*ethtypes.Log, error)
// TODO: Bloom methods
BloomStatus() (uint64, uint64)
}
// EthermintBackend implements Backend
var _ Backend = (*EthermintBackend)(nil)
// EthermintBackend implements the Backend interface
type EthermintBackend struct {
cliCtx context.CLIContext
gasLimit int64
@ -68,7 +76,7 @@ func (e *EthermintBackend) GetBlockByNumber(blockNum BlockNumber, fullTx bool) (
// GetBlockByHash returns the block identified by hash.
func (e *EthermintBackend) GetBlockByHash(hash common.Hash, fullTx bool) (map[string]interface{}, error) {
res, _, err := e.cliCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryHashToHeight, hash.Hex()))
res, height, err := e.cliCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryHashToHeight, hash.Hex()))
if err != nil {
return nil, err
}
@ -78,9 +86,60 @@ func (e *EthermintBackend) GetBlockByHash(hash common.Hash, fullTx bool) (map[st
return nil, err
}
e.cliCtx = e.cliCtx.WithHeight(height)
return e.getEthBlockByNumber(out.Number, fullTx)
}
// HeaderByNumber returns the block header identified by height.
func (e *EthermintBackend) HeaderByNumber(blockNum BlockNumber) (*ethtypes.Header, error) {
return e.getBlockHeader(blockNum.Int64())
}
// HeaderByHash returns the block header identified by hash.
func (e *EthermintBackend) HeaderByHash(blockHash common.Hash) (*ethtypes.Header, error) {
res, height, err := e.cliCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryHashToHeight, blockHash.Hex()))
if err != nil {
return nil, err
}
var out evmtypes.QueryResBlockNumber
if err := e.cliCtx.Codec.UnmarshalJSON(res, &out); err != nil {
return nil, err
}
e.cliCtx = e.cliCtx.WithHeight(height)
return e.getBlockHeader(out.Number)
}
func (e *EthermintBackend) getBlockHeader(height int64) (*ethtypes.Header, error) {
if height <= 0 {
// get latest block height
num, err := e.BlockNumber()
if err != nil {
return nil, err
}
height = int64(num)
}
block, err := e.cliCtx.Client.Block(&height)
if err != nil {
return nil, err
}
res, _, err := e.cliCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryBloom, strconv.FormatInt(height, 10)))
if err != nil {
return nil, err
}
var bloomRes evmtypes.QueryBloomFilter
e.cliCtx.Codec.MustUnmarshalJSON(res, &bloomRes)
ethHeader := EthHeaderFromTendermint(block.Block.Header)
ethHeader.Bloom = bloomRes.Bloom
return ethHeader, nil
}
func (e *EthermintBackend) getEthBlockByNumber(height int64, fullTx bool) (map[string]interface{}, error) {
// Remove this check when 0 query is fixed ref: (https://github.com/tendermint/tendermint/issues/4014)
var blkNumPtr *int64
@ -128,7 +187,6 @@ func (e *EthermintBackend) getEthBlockByNumber(height int64, fullTx bool) (map[s
var out evmtypes.QueryBloomFilter
e.cliCtx.Codec.MustUnmarshalJSON(res, &out)
return formatBlock(header, block.Block.Size(), gasLimit, gasUsed, transactions, out.Bloom), nil
}
@ -158,11 +216,12 @@ func (e *EthermintBackend) getGasLimit() (int64, error) {
}
// GetTransactionLogs returns the logs given a transaction hash.
// It returns an error if there's an encoding error.
// If no logs are found for the tx hash, the error is nil.
func (e *EthermintBackend) GetTransactionLogs(txHash common.Hash) ([]*ethtypes.Log, error) {
// do we need to use the block height somewhere?
ctx := e.cliCtx
res, _, err := ctx.QueryWithData(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryTransactionLogs, txHash.Hex()), nil)
res, height, err := ctx.QueryWithData(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryTransactionLogs, txHash.Hex()), nil)
if err != nil {
return nil, err
}
@ -172,6 +231,7 @@ func (e *EthermintBackend) GetTransactionLogs(txHash common.Hash) ([]*ethtypes.L
return nil, err
}
e.cliCtx = e.cliCtx.WithHeight(height)
return out.Logs, nil
}
@ -183,7 +243,7 @@ func (e *EthermintBackend) PendingTransactions() ([]*Transaction, error) {
return nil, err
}
transactions := make([]*Transaction, 0, 100)
transactions := make([]*Transaction, pendingTxs.Count)
for _, tx := range pendingTxs.Txs {
ethTx, err := bytesToEthTx(e.cliCtx, tx)
if err != nil {
@ -201,3 +261,64 @@ func (e *EthermintBackend) PendingTransactions() ([]*Transaction, error) {
return transactions, nil
}
// GetLogs returns all the logs from all the ethreum transactions in a block.
func (e *EthermintBackend) GetLogs(blockHash common.Hash) ([][]*ethtypes.Log, error) {
res, _, err := e.cliCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryHashToHeight, blockHash.Hex()))
if err != nil {
return nil, err
}
var out evmtypes.QueryResBlockNumber
if err := e.cliCtx.Codec.UnmarshalJSON(res, &out); err != nil {
return nil, err
}
block, err := e.cliCtx.Client.Block(&out.Number)
if err != nil {
return nil, err
}
var blockLogs = [][]*ethtypes.Log{}
for _, tx := range block.Block.Txs {
// NOTE: we query the state in case the tx result logs are not persisted after an upgrade.
res, _, err := e.cliCtx.QueryWithData(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryTransactionLogs, common.BytesToHash(tx.Hash()).Hex()), nil)
if err != nil {
continue
}
out := new(evmtypes.QueryETHLogs)
if err := e.cliCtx.Codec.UnmarshalJSON(res, &out); err != nil {
return nil, err
}
blockLogs = append(blockLogs, out.Logs)
}
return blockLogs, nil
}
// BloomStatus returns the BloomBitsBlocks and the number of processed sections maintained
// by the chain indexer.
func (e *EthermintBackend) BloomStatus() (uint64, uint64) {
return 4096, 0
}
// EthHeaderFromTendermint is an util function that returns an Ethereum Header
// from a tendermint Header.
func EthHeaderFromTendermint(header tmtypes.Header) *ethtypes.Header {
return &ethtypes.Header{
ParentHash: common.BytesToHash(header.LastBlockID.Hash.Bytes()),
UncleHash: common.Hash{},
Coinbase: common.Address{},
Root: common.BytesToHash(header.AppHash),
TxHash: common.BytesToHash(header.DataHash),
ReceiptHash: common.Hash{},
Difficulty: nil,
Number: big.NewInt(header.Height),
Time: uint64(header.Time.Unix()),
Extra: nil,
MixDigest: common.Hash{},
Nonce: ethtypes.BlockNonce{},
}
}

View File

@ -30,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/cosmos/cosmos-sdk/client/context"
"github.com/cosmos/cosmos-sdk/client/flags"
@ -388,7 +387,7 @@ type CallArgs struct {
}
// Call performs a raw contract call.
func (e *PublicEthAPI) Call(args CallArgs, blockNr rpc.BlockNumber, overrides *map[common.Address]account) (hexutil.Bytes, error) {
func (e *PublicEthAPI) Call(args CallArgs, blockNr BlockNumber, overrides *map[common.Address]account) (hexutil.Bytes, error) {
simRes, err := e.doCall(args, blockNr, big.NewInt(emint.DefaultRPCGasLimit))
if err != nil {
return []byte{}, err
@ -419,7 +418,7 @@ type account struct {
// DoCall performs a simulated call operation through the evmtypes. It returns the
// estimated gas used on the operation or an error if fails.
func (e *PublicEthAPI) doCall(
args CallArgs, blockNr rpc.BlockNumber, globalGasCap *big.Int,
args CallArgs, blockNr BlockNumber, globalGasCap *big.Int,
) (*sdk.SimulationResponse, error) {
// Set height for historical queries
ctx := e.cliCtx
@ -561,7 +560,8 @@ func convertTransactionsToRPC(cliCtx context.CLIContext, txs []tmtypes.Tx, block
for i, tx := range txs {
ethTx, err := bytesToEthTx(cliCtx, tx)
if err != nil {
return nil, nil, err
// continue to next transaction in case it's not a MsgEthereumTx
continue
}
// TODO: Remove gas usage calculation if saving gasUsed per block
gasUsed.Add(gasUsed, ethTx.Fee())
@ -602,7 +602,7 @@ func bytesToEthTx(cliCtx context.CLIContext, bz []byte) (*evmtypes.MsgEthereumTx
ethTx, ok := stdTx.(evmtypes.MsgEthereumTx)
if !ok {
return nil, fmt.Errorf("invalid transaction type, must be an amino encoded Ethereum transaction")
return nil, fmt.Errorf("invalid transaction type %T, expected MsgEthereumTx", stdTx)
}
return &ethTx, nil
}

View File

@ -1,79 +1,553 @@
package rpc
import (
"errors"
"context"
"fmt"
"sync"
"time"
"github.com/cosmos/cosmos-sdk/client/context"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
tmtypes "github.com/tendermint/tendermint/types"
"github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/rpc"
clientcontext "github.com/cosmos/cosmos-sdk/client/context"
evmtypes "github.com/cosmos/ethermint/x/evm/types"
)
// PublicFilterAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec.
type PublicFilterAPI struct {
cliCtx context.CLIContext
backend Backend
filters map[rpc.ID]*Filter // ID to filter; TODO: change to sync.Map in case of concurrent writes
// FiltersBackend defines the methods requided by the PublicFilterAPI backend
type FiltersBackend interface {
GetBlockByNumber(blockNum BlockNumber, fullTx bool) (map[string]interface{}, error)
HeaderByNumber(blockNr BlockNumber) (*ethtypes.Header, error)
HeaderByHash(blockHash common.Hash) (*ethtypes.Header, error)
GetLogs(blockHash common.Hash) ([][]*ethtypes.Log, error)
GetTransactionLogs(txHash common.Hash) ([]*ethtypes.Log, error)
BloomStatus() (uint64, uint64)
}
// NewPublicEthAPI creates an instance of the public ETH Web3 API.
func NewPublicFilterAPI(cliCtx context.CLIContext, backend Backend) *PublicFilterAPI {
return &PublicFilterAPI{
// consider a filter inactive if it has not been polled for within deadline
var deadline = 5 * time.Minute
// filter is a helper struct that holds meta information over the filter type
// and associated subscription in the event system.
type filter struct {
typ filters.Type
deadline *time.Timer // filter is inactive when deadline triggers
hashes []common.Hash
crit filters.FilterCriteria
logs []*ethtypes.Log
s *Subscription // associated subscription in event system
}
// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
// information related to the Ethereum protocol such as blocks, transactions and logs.
type PublicFilterAPI struct {
cliCtx clientcontext.CLIContext
backend FiltersBackend
events *EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]*filter
}
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(cliCtx clientcontext.CLIContext, backend FiltersBackend) *PublicFilterAPI {
// start the client to subscribe to Tendermint events
err := cliCtx.Client.Start()
if err != nil {
panic(err)
}
api := &PublicFilterAPI{
cliCtx: cliCtx,
backend: backend,
filters: make(map[rpc.ID]*Filter),
filters: make(map[rpc.ID]*filter),
events: NewEventSystem(cliCtx.Client),
}
go api.timeoutLoop()
return api
}
// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used.
// Tt is started when the api is created.
func (api *PublicFilterAPI) timeoutLoop() {
ticker := time.NewTicker(deadline)
defer ticker.Stop()
for {
<-ticker.C
api.filtersMu.Lock()
for id, f := range api.filters {
select {
case <-f.deadline.C:
f.s.Unsubscribe(api.events)
delete(api.filters, id)
default:
continue
}
}
api.filtersMu.Unlock()
}
}
// NewFilter instantiates a new filter.
func (e *PublicFilterAPI) NewFilter(criteria filters.FilterCriteria) rpc.ID {
id := rpc.NewID()
e.filters[id] = NewFilter(e.backend, &criteria)
return id
}
// NewBlockFilter instantiates a new block filter.
func (e *PublicFilterAPI) NewBlockFilter() rpc.ID {
id := rpc.NewID()
e.filters[id] = NewBlockFilter(e.backend)
return id
}
// NewPendingTransactionFilter instantiates a new pending transaction filter.
func (e *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
id := rpc.NewID()
e.filters[id] = NewPendingTransactionFilter(e.backend)
return id
}
// UninstallFilter uninstalls a filter with the given ID.
func (e *PublicFilterAPI) UninstallFilter(id rpc.ID) bool {
e.filters[id].uninstallFilter()
delete(e.filters, id)
return true
}
// GetFilterChanges returns an array of changes since the last poll.
// If the filter is a log filter, it returns an array of Logs.
// If the filter is a block filter, it returns an array of block hashes.
// If the filter is a pending transaction filter, it returns an array of transaction hashes.
func (e *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
if e.filters[id] == nil {
return nil, errors.New("invalid filter ID")
// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
// as transactions enter the pending state.
//
// It is part of the filter package because this filter can be used through the
// `eth_getFilterChanges` polling method that is also used for log filters.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newPendingTransactionFilter
func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
pendingTxSub, cancelSubs, err := api.events.SubscribePendingTxs()
if err != nil {
// wrap error on the ID
return rpc.ID(fmt.Sprintf("error creating pending tx filter: %s", err.Error()))
}
return e.filters[id].getFilterChanges()
api.filtersMu.Lock()
api.filters[pendingTxSub.ID()] = &filter{typ: filters.PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub}
api.filtersMu.Unlock()
go func(txsCh <-chan coretypes.ResultEvent, errCh <-chan error) {
defer cancelSubs()
for {
select {
case ev := <-txsCh:
data, _ := ev.Data.(tmtypes.EventDataTx)
txHash := common.BytesToHash(data.Tx.Hash())
api.filtersMu.Lock()
if f, found := api.filters[pendingTxSub.ID()]; found {
f.hashes = append(f.hashes, txHash)
}
api.filtersMu.Unlock()
case <-errCh:
api.filtersMu.Lock()
delete(api.filters, pendingTxSub.ID())
api.filtersMu.Unlock()
}
}
}(pendingTxSub.eventCh, pendingTxSub.Err())
return pendingTxSub.ID()
}
// GetFilterLogs returns an array of all logs matching filter with given id.
func (e *PublicFilterAPI) GetFilterLogs(id rpc.ID) ([]*ethtypes.Log, error) {
return e.filters[id].getFilterLogs()
// NewPendingTransactions creates a subscription that is triggered each time a transaction
// enters the transaction pool and was signed from one of the transactions this nodes manages.
func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
ctx, cancelFn := context.WithTimeout(context.Background(), deadline)
defer cancelFn()
api.events.WithContext(ctx)
pendingTxSub, cancelSubs, err := api.events.SubscribePendingTxs()
if err != nil {
return nil, err
}
go func(txsCh <-chan coretypes.ResultEvent) {
defer cancelSubs()
for {
select {
case ev := <-txsCh:
data, _ := ev.Data.(tmtypes.EventDataTx)
txHash := common.BytesToHash(data.Tx.Hash())
// To keep the original behaviour, send a single tx hash in one notification.
// TODO(rjl493456442) Send a batch of tx hashes in one notification
err = notifier.Notify(rpcSub.ID, txHash)
if err != nil {
return
}
case <-rpcSub.Err():
pendingTxSub.Unsubscribe(api.events)
return
case <-notifier.Closed():
pendingTxSub.Unsubscribe(api.events)
return
}
}
}(pendingTxSub.eventCh)
return rpcSub, err
}
// NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
// It is part of the filter package since polling goes with eth_getFilterChanges.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter
func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
headerSub, cancelSubs, err := api.events.SubscribeNewHeads()
if err != nil {
// wrap error on the ID
return rpc.ID(fmt.Sprintf("error creating block filter: %s", err.Error()))
}
api.filtersMu.Lock()
api.filters[headerSub.ID()] = &filter{typ: filters.BlocksSubscription, deadline: time.NewTimer(deadline), hashes: []common.Hash{}, s: headerSub}
api.filtersMu.Unlock()
go func(headersCh <-chan coretypes.ResultEvent, errCh <-chan error) {
defer cancelSubs()
for {
select {
case ev := <-headersCh:
data, _ := ev.Data.(tmtypes.EventDataNewBlockHeader)
header := EthHeaderFromTendermint(data.Header)
api.filtersMu.Lock()
if f, found := api.filters[headerSub.ID()]; found {
f.hashes = append(f.hashes, header.Hash())
}
api.filtersMu.Unlock()
case <-errCh:
api.filtersMu.Lock()
delete(api.filters, headerSub.ID())
api.filtersMu.Unlock()
return
}
}
}(headerSub.eventCh, headerSub.Err())
return headerSub.ID()
}
// NewHeads send a notification each time a new (header) block is appended to the chain.
func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
api.events.WithContext(ctx)
rpcSub := notifier.CreateSubscription()
headersSub, cancelSubs, err := api.events.SubscribeNewHeads()
if err != nil {
return &rpc.Subscription{}, err
}
go func(headersCh <-chan coretypes.ResultEvent) {
defer cancelSubs()
for {
select {
case ev := <-headersCh:
data, ok := ev.Data.(tmtypes.EventDataNewBlockHeader)
if !ok {
err = fmt.Errorf("invalid event data %T, expected %s", ev.Data, tmtypes.EventNewBlockHeader)
headersSub.err <- err
return
}
header := EthHeaderFromTendermint(data.Header)
err = notifier.Notify(rpcSub.ID, header)
if err != nil {
headersSub.err <- err
return
}
case <-rpcSub.Err():
headersSub.Unsubscribe(api.events)
return
case <-notifier.Closed():
headersSub.Unsubscribe(api.events)
return
}
}
}(headersSub.eventCh)
return rpcSub, err
}
// Logs creates a subscription that fires for all new log that match the given filter criteria.
func (api *PublicFilterAPI) Logs(ctx context.Context, crit filters.FilterCriteria) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
api.events.WithContext(ctx)
rpcSub := notifier.CreateSubscription()
logsSub, cancelSubs, err := api.events.SubscribeLogs(crit)
if err != nil {
return &rpc.Subscription{}, err
}
go func(logsCh <-chan coretypes.ResultEvent) {
defer cancelSubs()
for {
select {
case event := <-logsCh:
// filter only events from EVM module txs
_, isMsgEthermint := event.Events[evmtypes.TypeMsgEthermint]
_, isMsgEthereumTx := event.Events[evmtypes.TypeMsgEthereumTx]
if !(isMsgEthermint || isMsgEthereumTx) {
// ignore transaction as it's not from the evm module
return
}
// get transaction result data
dataTx, ok := event.Data.(tmtypes.EventDataTx)
if !ok {
err = fmt.Errorf("invalid event data %T, expected %s", event.Data, tmtypes.EventTx)
logsSub.err <- err
return
}
resultData, err := evmtypes.DecodeResultData(dataTx.TxResult.Result.Data)
if err != nil {
return
}
logs := filterLogs(resultData.Logs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics)
for _, log := range logs {
err = notifier.Notify(rpcSub.ID, log)
if err != nil {
return
}
}
case <-rpcSub.Err(): // client send an unsubscribe request
logsSub.Unsubscribe(api.events)
return
case <-notifier.Closed(): // connection dropped
logsSub.Unsubscribe(api.events)
return
}
}
}(logsSub.eventCh)
return rpcSub, err
}
// NewFilter creates a new filter and returns the filter id. It can be
// used to retrieve logs when the state changes. This method cannot be
// used to fetch logs that are already stored in the state.
//
// Default criteria for the from and to block are "latest".
// Using "latest" as block number will return logs for mined blocks.
// Using "pending" as block number returns logs for not yet mined (pending) blocks.
// In case logs are removed (chain reorg) previously returned logs are returned
// again but with the removed property set to true.
//
// In case "fromBlock" > "toBlock" an error is returned.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter
func (api *PublicFilterAPI) NewFilter(criteria filters.FilterCriteria) (rpc.ID, error) {
var (
filterID = rpc.ID("")
err error
)
logsSub, cancelSubs, err := api.events.SubscribeLogs(criteria)
if err != nil {
return rpc.ID(""), err
}
filterID = logsSub.ID()
api.filtersMu.Lock()
api.filters[filterID] = &filter{typ: filters.LogsSubscription, deadline: time.NewTimer(deadline), hashes: []common.Hash{}, s: logsSub}
api.filtersMu.Unlock()
go func(eventCh <-chan coretypes.ResultEvent) {
defer cancelSubs()
for {
select {
case event := <-eventCh:
dataTx, ok := event.Data.(tmtypes.EventDataTx)
if !ok {
err = fmt.Errorf("invalid event data %T, expected EventDataTx", event.Data)
return
}
var resultData evmtypes.ResultData
resultData, err = evmtypes.DecodeResultData(dataTx.TxResult.Result.Data)
if err != nil {
return
}
logs := filterLogs(resultData.Logs, criteria.FromBlock, criteria.ToBlock, criteria.Addresses, criteria.Topics)
api.filtersMu.Lock()
if f, found := api.filters[filterID]; found {
f.logs = append(f.logs, logs...)
}
api.filtersMu.Unlock()
case <-logsSub.Err():
api.filtersMu.Lock()
delete(api.filters, filterID)
api.filtersMu.Unlock()
return
}
}
}(logsSub.eventCh)
return filterID, err
}
// GetLogs returns logs matching the given argument that are stored within the state.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
func (e *PublicFilterAPI) GetLogs(criteria filters.FilterCriteria) ([]*ethtypes.Log, error) {
filter := NewFilter(e.backend, &criteria)
return filter.getFilterLogs()
func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([]*ethtypes.Log, error) {
var filter *Filter
if crit.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = NewBlockFilter(api.backend, crit)
} else {
// Convert the RPC block numbers into internal representations
begin := rpc.LatestBlockNumber.Int64()
if crit.FromBlock != nil {
begin = crit.FromBlock.Int64()
}
end := rpc.LatestBlockNumber.Int64()
if crit.ToBlock != nil {
end = crit.ToBlock.Int64()
}
// Construct the range filter
filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
if err != nil {
return nil, err
}
return returnLogs(logs), err
}
// UninstallFilter removes the filter with the given filter id.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter
func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool {
api.filtersMu.Lock()
f, found := api.filters[id]
if found {
delete(api.filters, id)
}
api.filtersMu.Unlock()
if !found {
return false
}
f.s.Unsubscribe(api.events)
return true
}
// GetFilterLogs returns the logs for the filter with the given id.
// If the filter could not be found an empty array of logs is returned.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs
func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ethtypes.Log, error) {
api.filtersMu.Lock()
f, found := api.filters[id]
api.filtersMu.Unlock()
if !found {
return returnLogs(nil), fmt.Errorf("filter %s not found", id)
}
if f.typ != filters.LogsSubscription {
return returnLogs(nil), fmt.Errorf("filter %s doesn't have a LogsSubscription type: got %d", id, f.typ)
}
var filter *Filter
if f.crit.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = NewBlockFilter(api.backend, f.crit)
} else {
// Convert the RPC block numbers into internal representations
begin := rpc.LatestBlockNumber.Int64()
if f.crit.FromBlock != nil {
begin = f.crit.FromBlock.Int64()
}
end := rpc.LatestBlockNumber.Int64()
if f.crit.ToBlock != nil {
end = f.crit.ToBlock.Int64()
}
// Construct the range filter
filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
if err != nil {
return nil, err
}
return returnLogs(logs), nil
}
// GetFilterChanges returns the logs for the filter with the given id since
// last time it was called. This can be used for polling.
//
// For pending transaction and block filters the result is []common.Hash.
// (pending)Log filters return []Log.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
api.filtersMu.Lock()
defer api.filtersMu.Unlock()
f, found := api.filters[id]
if !found {
return nil, fmt.Errorf("filter %s not found", id)
}
if !f.deadline.Stop() {
// timer expired but filter is not yet removed in timeout loop
// receive timer value and reset timer
<-f.deadline.C
}
f.deadline.Reset(deadline)
switch f.typ {
case filters.PendingTransactionsSubscription, filters.BlocksSubscription:
hashes := f.hashes
f.hashes = nil
return returnHashes(hashes), nil
case filters.LogsSubscription, filters.MinedAndPendingLogsSubscription:
logs := make([]*ethtypes.Log, len(f.logs))
copy(logs, f.logs)
f.logs = []*ethtypes.Log{}
return returnLogs(logs), nil
default:
return nil, fmt.Errorf("invalid filter %s type %d", id, f.typ)
}
}
// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
// otherwise the given hashes array is returned.
func returnHashes(hashes []common.Hash) []common.Hash {
if hashes == nil {
return []common.Hash{}
}
return hashes
}
// returnLogs is a helper that will return an empty log array in case the given logs array is nil,
// otherwise the given logs array is returned.
func returnLogs(logs []*ethtypes.Log) []*ethtypes.Log {
if logs == nil {
return []*ethtypes.Log{}
}
return logs
}

433
rpc/filter_system.go Normal file
View File

@ -0,0 +1,433 @@
package rpc
import (
"context"
"fmt"
"log"
"time"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
rpcclient "github.com/tendermint/tendermint/rpc/client"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
tmtypes "github.com/tendermint/tendermint/types"
"github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/rpc"
sdk "github.com/cosmos/cosmos-sdk/types"
evmtypes "github.com/cosmos/ethermint/x/evm/types"
)
var (
txEvents = tmtypes.QueryForEvent(tmtypes.EventTx).String()
evmEvents = tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s.%s='%s'", tmtypes.EventTypeKey, tmtypes.EventTx, sdk.EventTypeMessage, sdk.AttributeKeyModule, evmtypes.ModuleName)).String()
headerEvents = tmtypes.QueryForEvent(tmtypes.EventNewBlockHeader).String()
)
// EventSystem creates subscriptions, processes events and broadcasts them to the
// subscription which match the subscription criteria using the Tendermint's RPC client.
type EventSystem struct {
ctx context.Context
client rpcclient.Client
// light client mode
lightMode bool
index filterIndex
// Subscriptions
txsSub *Subscription // Subscription for new transaction event
logsSub *Subscription // Subscription for new log event
// rmLogsSub *Subscription // Subscription for removed log event
pendingLogsSub *Subscription // Subscription for pending log event
chainSub *Subscription // Subscription for new chain event
// Channels
install chan *Subscription // install filter for event notification
uninstall chan *Subscription // remove filter for event notification
// Unidirectional channels to receive Tendermint ResultEvents
txsCh <-chan coretypes.ResultEvent // Channel to receive new pending transactions event
logsCh <-chan coretypes.ResultEvent // Channel to receive new log event
pendingLogsCh <-chan coretypes.ResultEvent // Channel to receive new pending log event
// rmLogsCh <-chan coretypes.ResultEvent // Channel to receive removed log event
chainCh <-chan coretypes.ResultEvent // Channel to receive new chain event
}
// NewEventSystem creates a new manager that listens for event on the given mux,
// parses and filters them. It uses the all map to retrieve filter changes. The
// work loop holds its own index that is used to forward events to filters.
//
// The returned manager has a loop that needs to be stopped with the Stop function
// or by stopping the given mux.
func NewEventSystem(client rpcclient.Client) *EventSystem {
index := make(filterIndex)
for i := filters.UnknownSubscription; i < filters.LastIndexSubscription; i++ {
index[i] = make(map[rpc.ID]*Subscription)
}
es := &EventSystem{
ctx: context.Background(),
client: client,
lightMode: false,
index: index,
install: make(chan *Subscription),
uninstall: make(chan *Subscription),
txsCh: make(<-chan coretypes.ResultEvent),
logsCh: make(<-chan coretypes.ResultEvent),
pendingLogsCh: make(<-chan coretypes.ResultEvent),
// rmLogsCh: make(<-chan coretypes.ResultEvent),
chainCh: make(<-chan coretypes.ResultEvent),
}
go es.eventLoop()
return es
}
// WithContext sets a new context to the EventSystem. This is required to set a timeout context when
// a new filter is intantiated.
func (es *EventSystem) WithContext(ctx context.Context) {
es.ctx = ctx
}
// subscribe performs a new event subscription to a given Tendermint event.
// The subscription creates a unidirectional receive event channel to receive the ResultEvent. By
// default, the subscription timeouts (i.e is canceled) after 5 minutes. This function returns an
// error if the subscription fails (eg: if the identifier is already subscribed) or if the filter
// type is invalid.
func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, context.CancelFunc, error) {
var (
err error
cancelFn context.CancelFunc
eventCh <-chan coretypes.ResultEvent
)
es.ctx, cancelFn = context.WithTimeout(context.Background(), deadline)
switch sub.typ {
case filters.PendingTransactionsSubscription:
eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event)
case filters.PendingLogsSubscription, filters.MinedAndPendingLogsSubscription:
eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event)
case filters.LogsSubscription:
eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event)
case filters.BlocksSubscription:
eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event)
default:
err = fmt.Errorf("invalid filter subscription type %d", sub.typ)
}
if err != nil {
sub.err <- err
return nil, cancelFn, err
}
// wrap events in a go routine to prevent blocking
go func() {
es.install <- sub
<-sub.installed
}()
sub.eventCh = eventCh
return sub, cancelFn, nil
}
// SubscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel. Default value for the from and to
// block is "latest". If the fromBlock > toBlock an error is returned.
func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) {
var from, to rpc.BlockNumber
if crit.FromBlock == nil {
from = rpc.LatestBlockNumber
} else {
from = rpc.BlockNumber(crit.FromBlock.Int64())
}
if crit.ToBlock == nil {
to = rpc.LatestBlockNumber
} else {
to = rpc.BlockNumber(crit.ToBlock.Int64())
}
switch {
// only interested in pending logs
case from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber:
return es.subscribePendingLogs(crit)
// only interested in new mined logs, mined logs within a specific block range, or
// logs from a specific block number to new mined blocks
case (from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber),
(from >= 0 && to >= 0 && to >= from):
return es.subscribeLogs(crit)
// interested in mined logs from a specific block number, new logs and pending logs
case from >= rpc.LatestBlockNumber && (to == rpc.PendingBlockNumber || to == rpc.LatestBlockNumber):
return es.subscribeMinedPendingLogs(crit)
default:
return nil, nil, fmt.Errorf("invalid from and to block combination: from > to (%d > %d)", from, to)
}
}
// subscribeMinedPendingLogs creates a subscription that returned mined and
// pending logs that match the given criteria.
func (es *EventSystem) subscribeMinedPendingLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) {
sub := &Subscription{
id: rpc.NewID(),
typ: filters.MinedAndPendingLogsSubscription,
event: evmEvents,
logsCrit: crit,
created: time.Now().UTC(),
logs: make(chan []*ethtypes.Log),
installed: make(chan struct{}, 1),
err: make(chan error, 1),
}
return es.subscribe(sub)
}
// subscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel.
func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) {
sub := &Subscription{
id: rpc.NewID(),
typ: filters.LogsSubscription,
event: evmEvents,
logsCrit: crit,
created: time.Now().UTC(),
logs: make(chan []*ethtypes.Log),
installed: make(chan struct{}, 1),
err: make(chan error, 1),
}
return es.subscribe(sub)
}
// subscribePendingLogs creates a subscription that writes transaction hashes for
// transactions that enter the transaction pool.
func (es *EventSystem) subscribePendingLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) {
sub := &Subscription{
id: rpc.NewID(),
typ: filters.PendingLogsSubscription,
event: evmEvents,
logsCrit: crit,
created: time.Now().UTC(),
logs: make(chan []*ethtypes.Log),
installed: make(chan struct{}, 1),
err: make(chan error, 1),
}
return es.subscribe(sub)
}
// SubscribeNewHeads subscribes to new block headers events.
func (es EventSystem) SubscribeNewHeads() (*Subscription, context.CancelFunc, error) {
sub := &Subscription{
id: rpc.NewID(),
typ: filters.BlocksSubscription,
event: headerEvents,
created: time.Now().UTC(),
headers: make(chan *ethtypes.Header),
installed: make(chan struct{}, 1),
err: make(chan error, 1),
}
return es.subscribe(sub)
}
// SubscribePendingTxs subscribes to new pending transactions events from the mempool.
func (es EventSystem) SubscribePendingTxs() (*Subscription, context.CancelFunc, error) {
sub := &Subscription{
id: rpc.NewID(),
typ: filters.PendingTransactionsSubscription,
event: txEvents,
created: time.Now().UTC(),
hashes: make(chan []common.Hash),
installed: make(chan struct{}, 1),
err: make(chan error, 1),
}
return es.subscribe(sub)
}
type filterIndex map[filters.Type]map[rpc.ID]*Subscription
func (es *EventSystem) handleLogs(ev coretypes.ResultEvent) {
data, _ := ev.Data.(tmtypes.EventDataTx)
resultData, err := evmtypes.DecodeResultData(data.TxResult.Result.Data)
if err != nil {
return
}
if len(resultData.Logs) == 0 {
return
}
for _, f := range es.index[filters.LogsSubscription] {
matchedLogs := filterLogs(resultData.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
if len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
}
func (es *EventSystem) handleTxsEvent(ev coretypes.ResultEvent) {
data, _ := ev.Data.(tmtypes.EventDataTx)
for _, f := range es.index[filters.PendingTransactionsSubscription] {
f.hashes <- []common.Hash{common.BytesToHash(data.Tx.Hash())}
}
}
func (es *EventSystem) handleChainEvent(ev coretypes.ResultEvent) {
data, _ := ev.Data.(tmtypes.EventDataNewBlockHeader)
for _, f := range es.index[filters.BlocksSubscription] {
f.headers <- EthHeaderFromTendermint(data.Header)
}
// TODO: light client
}
// eventLoop (un)installs filters and processes mux events.
func (es *EventSystem) eventLoop() {
var (
err error
cancelPendingTxsSubs, cancelLogsSubs, cancelPendingLogsSubs, cancelHeaderSubs context.CancelFunc
)
// Subscribe events
es.txsSub, cancelPendingTxsSubs, err = es.SubscribePendingTxs()
if err != nil {
panic(fmt.Errorf("failed to subscribe pending txs: %w", err))
}
defer cancelPendingTxsSubs()
es.logsSub, cancelLogsSubs, err = es.SubscribeLogs(filters.FilterCriteria{})
if err != nil {
panic(fmt.Errorf("failed to subscribe logs: %w", err))
}
defer cancelLogsSubs()
es.pendingLogsSub, cancelPendingLogsSubs, err = es.subscribePendingLogs(filters.FilterCriteria{})
if err != nil {
panic(fmt.Errorf("failed to subscribe pending logs: %w", err))
}
defer cancelPendingLogsSubs()
es.chainSub, cancelHeaderSubs, err = es.SubscribeNewHeads()
if err != nil {
panic(fmt.Errorf("failed to subscribe headers: %w", err))
}
defer cancelHeaderSubs()
// Ensure all subscriptions get cleaned up
defer func() {
es.txsSub.Unsubscribe(es)
es.logsSub.Unsubscribe(es)
// es.rmLogsSub.Unsubscribe(es)
es.pendingLogsSub.Unsubscribe(es)
es.chainSub.Unsubscribe(es)
}()
for {
select {
case txEvent := <-es.txsSub.eventCh:
es.handleTxsEvent(txEvent)
case headerEv := <-es.chainSub.eventCh:
es.handleChainEvent(headerEv)
case logsEv := <-es.logsSub.eventCh:
es.handleLogs(logsEv)
// TODO: figure out how to handle removed logs
// case logsEv := <-es.rmLogsSub.eventCh:
// es.handleLogs(logsEv)
case logsEv := <-es.pendingLogsSub.eventCh:
es.handleLogs(logsEv)
case f := <-es.install:
if f.typ == filters.MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
es.index[filters.LogsSubscription][f.id] = f
es.index[filters.PendingLogsSubscription][f.id] = f
} else {
es.index[f.typ][f.id] = f
}
close(f.installed)
case f := <-es.uninstall:
if f.typ == filters.MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
delete(es.index[filters.LogsSubscription], f.id)
delete(es.index[filters.PendingLogsSubscription], f.id)
} else {
delete(es.index[f.typ], f.id)
}
close(f.err)
// System stopped
case <-es.txsSub.Err():
return
case <-es.logsSub.Err():
return
// case <-es.rmLogsSub.Err():
// return
case <-es.pendingLogsSub.Err():
return
case <-es.chainSub.Err():
return
}
}
// }()
}
// Subscription defines a wrapper for the private subscription
type Subscription struct {
id rpc.ID
typ filters.Type
event string
created time.Time
logsCrit filters.FilterCriteria
logs chan []*ethtypes.Log
hashes chan []common.Hash
headers chan *ethtypes.Header
installed chan struct{} // closed when the filter is installed
eventCh <-chan coretypes.ResultEvent
err chan error
}
// ID returns the underlying subscription RPC identifier.
func (s Subscription) ID() rpc.ID {
return s.id
}
// Unsubscribe to the current subscription from Tendermint Websocket. It sends an error to the
// subscription error channel if unsubscription fails.
func (s *Subscription) Unsubscribe(es *EventSystem) {
if err := es.client.Unsubscribe(es.ctx, string(s.ID()), s.event); err != nil {
s.err <- err
}
go func() {
defer func() {
log.Println("successfully unsubscribed to event", s.event)
}()
uninstallLoop:
for {
// write uninstall request and consume logs/hashes. This prevents
// the eventLoop broadcast method to deadlock when writing to the
// filter event channel while the subscription loop is waiting for
// this method to return (and thus not reading these events).
select {
case es.uninstall <- s:
break uninstallLoop
case <-s.logs:
case <-s.hashes:
case <-s.headers:
}
}
}()
}
// Err returns the error channel
func (s *Subscription) Err() <-chan error {
return s.err
}

View File

@ -1,287 +1,168 @@
package rpc
import (
"errors"
"context"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/bloombits"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/log"
)
/*
- Filter functions derived from go-ethereum
Used to set the criteria passed in from RPC params
*/
const blockFilter = "block"
const pendingTxFilter = "pending"
const logFilter = "log"
// Filter can be used to retrieve and filter logs, blocks, or pending transactions.
// Filter can be used to retrieve and filter logs.
type Filter struct {
backend Backend
fromBlock, toBlock *big.Int // start and end block numbers
addresses []common.Address // contract addresses to watch
topics [][]common.Hash // log topics to watch for
blockHash *common.Hash // Block hash if filtering a single block
typ string
hashes []common.Hash // filtered block or transaction hashes
logs []*ethtypes.Log //nolint // filtered logs
stopped bool // set to true once filter in uninstalled
err error
backend FiltersBackend
criteria filters.FilterCriteria
matcher *bloombits.Matcher
}
// NewFilter returns a new Filter
func NewFilter(backend Backend, criteria *filters.FilterCriteria) *Filter {
filter := &Filter{
backend: backend,
fromBlock: criteria.FromBlock,
toBlock: criteria.ToBlock,
addresses: criteria.Addresses,
topics: criteria.Topics,
typ: logFilter,
stopped: false,
// NewBlockFilter creates a new filter which directly inspects the contents of
// a block to figure out whether it is interesting or not.
func NewBlockFilter(backend FiltersBackend, criteria filters.FilterCriteria) *Filter {
// Create a generic filter and convert it into a block filter
return newFilter(backend, criteria, nil)
}
// NewRangeFilter creates a new filter which uses a bloom filter on blocks to
// figure out whether a particular block is interesting or not.
func NewRangeFilter(backend FiltersBackend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
// Flatten the address and topic filter clauses into a single bloombits filter
// system. Since the bloombits are not positional, nil topics are permitted,
// which get flattened into a nil byte slice.
var filtersBz [][][]byte // nolint: prealloc
if len(addresses) > 0 {
filter := make([][]byte, len(addresses))
for i, address := range addresses {
filter[i] = address.Bytes()
}
filtersBz = append(filtersBz, filter)
}
return filter
for _, topicList := range topics {
filter := make([][]byte, len(topicList))
for i, topic := range topicList {
filter[i] = topic.Bytes()
}
filtersBz = append(filtersBz, filter)
}
size, _ := backend.BloomStatus()
// Create a generic filter and convert it into a range filter
criteria := filters.FilterCriteria{
FromBlock: big.NewInt(begin),
ToBlock: big.NewInt(end),
Addresses: addresses,
Topics: topics,
}
return newFilter(backend, criteria, bloombits.NewMatcher(size, filtersBz))
}
// NewFilterWithBlockHash returns a new Filter with a blockHash.
func NewFilterWithBlockHash(backend Backend, criteria *filters.FilterCriteria) *Filter {
// newFilter returns a new Filter
func newFilter(backend FiltersBackend, criteria filters.FilterCriteria, matcher *bloombits.Matcher) *Filter {
return &Filter{
backend: backend,
fromBlock: criteria.FromBlock,
toBlock: criteria.ToBlock,
addresses: criteria.Addresses,
topics: criteria.Topics,
blockHash: criteria.BlockHash,
typ: logFilter,
backend: backend,
criteria: criteria,
matcher: matcher,
}
}
// NewBlockFilter creates a new filter that notifies when a block arrives.
func NewBlockFilter(backend Backend) *Filter {
filter := NewFilter(backend, &filters.FilterCriteria{})
filter.typ = blockFilter
// Logs searches the blockchain for matching log entries, returning all from the
// first block that contains matches, updating the start of the filter accordingly.
func (f *Filter) Logs(_ context.Context) ([]*ethtypes.Log, error) {
logs := []*ethtypes.Log{}
var err error
go func() {
err := filter.pollForBlocks()
if err != nil {
filter.err = err
}
}()
return filter
}
func (f *Filter) pollForBlocks() error {
prev := hexutil.Uint64(0)
for {
if f.stopped {
return nil
}
num, err := f.backend.BlockNumber()
if err != nil {
return err
}
if num == prev {
continue
}
block, err := f.backend.GetBlockByNumber(BlockNumber(num), false)
if err != nil {
return err
}
hashBytes, ok := block["hash"].(hexutil.Bytes)
if !ok {
return errors.New("could not convert block hash to hexutil.Bytes")
}
hash := common.BytesToHash(hashBytes)
f.hashes = append(f.hashes, hash)
prev = num
// TODO: should we add a delay?
}
}
func (f *Filter) pollForTransactions() error {
for {
if f.stopped {
return nil
}
txs, err := f.backend.PendingTransactions()
if err != nil {
return err
}
for _, tx := range txs {
if !contains(f.hashes, tx.Hash) {
f.hashes = append(f.hashes, tx.Hash)
}
}
<-time.After(1 * time.Second)
}
}
func contains(slice []common.Hash, item common.Hash) bool {
set := make(map[common.Hash]struct{}, len(slice))
for _, s := range slice {
set[s] = struct{}{}
}
_, ok := set[item]
return ok
}
// NewPendingTransactionFilter creates a new filter that notifies when a pending transaction arrives.
func NewPendingTransactionFilter(backend Backend) *Filter {
filter := NewFilter(backend, &filters.FilterCriteria{})
filter.typ = pendingTxFilter
go func() {
err := filter.pollForTransactions()
if err != nil {
filter.err = err
}
}()
return filter
}
func (f *Filter) uninstallFilter() {
f.stopped = true
}
func (f *Filter) getFilterChanges() (interface{}, error) {
switch f.typ {
case blockFilter:
if f.err != nil {
return nil, f.err
}
blocks := make([]common.Hash, len(f.hashes))
copy(blocks, f.hashes)
f.hashes = []common.Hash{}
return blocks, nil
case pendingTxFilter:
if f.err != nil {
return nil, f.err
}
txs := make([]common.Hash, len(f.hashes))
copy(txs, f.hashes)
f.hashes = []common.Hash{}
return txs, nil
case logFilter:
return f.getFilterLogs()
}
return nil, errors.New("unsupported filter")
}
func (f *Filter) getFilterLogs() ([]*ethtypes.Log, error) {
ret := []*ethtypes.Log{}
// filter specific block only
if f.blockHash != nil {
block, err := f.backend.GetBlockByHash(*f.blockHash, true)
// If we're doing singleton block filtering, execute and return
if f.criteria.BlockHash != nil && f.criteria.BlockHash != (&common.Hash{}) {
header, err := f.backend.HeaderByHash(*f.criteria.BlockHash)
if err != nil {
return nil, err
}
// if the logsBloom == 0, there are no logs in that block
if txs, ok := block["transactions"].([]common.Hash); !ok {
return ret, nil
} else if len(txs) != 0 {
return f.checkMatches(block)
if header == nil {
return nil, fmt.Errorf("unknown block header %s", f.criteria.BlockHash.String())
}
return f.blockLogs(header)
}
// filter range of blocks
num, err := f.backend.BlockNumber()
// Figure out the limits of the filter range
header, err := f.backend.HeaderByNumber(LatestBlockNumber)
if err != nil {
return nil, err
}
// if f.fromBlock is set to 0, set it to the latest block number
if f.fromBlock == nil || f.fromBlock.Cmp(big.NewInt(0)) == 0 {
f.fromBlock = big.NewInt(int64(num))
if header == nil || header.Number == nil {
return nil, nil
}
// if f.toBlock is set to 0, set it to the latest block number
if f.toBlock == nil || f.toBlock.Cmp(big.NewInt(0)) == 0 {
f.toBlock = big.NewInt(int64(num))
head := header.Number.Int64()
if f.criteria.FromBlock.Int64() == -1 {
f.criteria.FromBlock = big.NewInt(head)
}
if f.criteria.ToBlock.Int64() == -1 {
f.criteria.ToBlock = big.NewInt(head)
}
log.Debug("[ethAPI] Retrieving filter logs", "fromBlock", f.fromBlock, "toBlock", f.toBlock,
"topics", f.topics, "addresses", f.addresses)
from := f.fromBlock.Int64()
to := f.toBlock.Int64()
for i := from; i <= to; i++ {
block, err := f.backend.GetBlockByNumber(NewBlockNumber(big.NewInt(i)), true)
for i := f.criteria.FromBlock.Int64(); i <= f.criteria.ToBlock.Int64(); i++ {
block, err := f.backend.GetBlockByNumber(BlockNumber(i), true)
if err != nil {
f.err = err
log.Debug("[ethAPI] Cannot get block", "block", block["number"], "error", err)
break
return logs, err
}
log.Debug("[ethAPI] filtering", "block", block)
// TODO: block logsBloom is often set in the wrong block
// if the logsBloom == 0, there are no logs in that block
if txs, ok := block["transactions"].([]common.Hash); !ok {
txs, ok := block["transactions"].([]common.Hash)
if !ok || len(txs) == 0 {
continue
} else if len(txs) != 0 {
logs, err := f.checkMatches(block)
if err != nil {
f.err = err
break
}
ret = append(ret, logs...)
}
logsMatched := f.checkMatches(txs)
logs = append(logs, logsMatched...)
}
return ret, nil
return logs, nil
}
func (f *Filter) checkMatches(block map[string]interface{}) ([]*ethtypes.Log, error) {
transactions, ok := block["transactions"].([]common.Hash)
if !ok {
return nil, errors.New("invalid block transactions")
// blockLogs returns the logs matching the filter criteria within a single block.
func (f *Filter) blockLogs(header *ethtypes.Header) ([]*ethtypes.Log, error) {
if !bloomFilter(header.Bloom, f.criteria.Addresses, f.criteria.Topics) {
return []*ethtypes.Log{}, nil
}
unfiltered := []*ethtypes.Log{}
logsList, err := f.backend.GetLogs(header.Hash())
if err != nil {
return []*ethtypes.Log{}, err
}
var unfiltered []*ethtypes.Log // nolint: prealloc
for _, logs := range logsList {
unfiltered = append(unfiltered, logs...)
}
logs := filterLogs(unfiltered, nil, nil, f.criteria.Addresses, f.criteria.Topics)
if len(logs) == 0 {
return []*ethtypes.Log{}, nil
}
return logs, nil
}
// checkMatches checks if the logs from the a list of transactions transaction
// contain any log events that match the filter criteria. This function is
// called when the bloom filter signals a potential match.
func (f *Filter) checkMatches(transactions []common.Hash) []*ethtypes.Log {
unfiltered := []*ethtypes.Log{}
for _, tx := range transactions {
logs, err := f.backend.GetTransactionLogs(common.BytesToHash(tx[:]))
logs, err := f.backend.GetTransactionLogs(tx)
if err != nil {
return nil, err
// ignore error if transaction didn't set any logs (eg: when tx type is not
// MsgEthereumTx or MsgEthermint)
continue
}
unfiltered = append(unfiltered, logs...)
}
return filterLogs(unfiltered, f.fromBlock, f.toBlock, f.addresses, f.topics), nil
return filterLogs(unfiltered, f.criteria.FromBlock, f.criteria.ToBlock, f.criteria.Addresses, f.criteria.Topics)
}
// filterLogs creates a slice of logs matching the given criteria.
@ -305,7 +186,7 @@ Logs:
}
// If the to filtered topics is greater than the amount of topics in logs, skip.
if len(topics) > len(log.Topics) {
continue Logs
continue
}
for i, sub := range topics {
match := len(sub) == 0 // empty rule set == wildcard
@ -333,3 +214,29 @@ func includes(addresses []common.Address, a common.Address) bool {
return false
}
func bloomFilter(bloom ethtypes.Bloom, addresses []common.Address, topics [][]common.Hash) bool {
var included bool
if len(addresses) > 0 {
for _, addr := range addresses {
if ethtypes.BloomLookup(bloom, addr) {
included = true
break
}
}
if !included {
return false
}
}
for _, sub := range topics {
included = len(sub) == 0 // empty rule set == wildcard
for _, topic := range sub {
if ethtypes.BloomLookup(bloom, topic) {
included = true
break
}
}
}
return included
}

View File

@ -15,8 +15,8 @@ type PublicNetAPI struct {
networkVersion uint64
}
// NewPersonalEthAPI creates an instance of the public ETH Web3 API.
func NewPublicNetAPI(cliCtx context.CLIContext) *PublicNetAPI {
// NewPublicNetAPI creates an instance of the public Net Web3 API.
func NewPublicNetAPI(_ context.CLIContext) *PublicNetAPI {
chainID := viper.GetString(flags.FlagChainID)
// parse the chainID from a integer string
intChainID, err := strconv.ParseUint(chainID, 0, 64)

View File

@ -20,6 +20,7 @@ const (
EarliestBlockNumber = BlockNumber(1)
)
// NewBlockNumber creates a new BlockNumber instance.
func NewBlockNumber(n *big.Int) BlockNumber {
return BlockNumber(n.Int64())
}

View File

@ -276,6 +276,22 @@ func TestEth_NewBlockFilter(t *testing.T) {
require.NoError(t, err)
}
func TestEth_GetFilterChanges_BlockFilter(t *testing.T) {
rpcRes := call(t, "eth_newBlockFilter", []string{})
var ID hexutil.Bytes
err := json.Unmarshal(rpcRes.Result, &ID)
require.NoError(t, err)
time.Sleep(5 * time.Second)
changesRes := call(t, "eth_getFilterChanges", []string{ID.String()})
var hashes []ethcmn.Hash
err = json.Unmarshal(changesRes.Result, &hashes)
require.NoError(t, err)
require.GreaterOrEqual(t, len(hashes), 1)
}
func TestEth_GetFilterChanges_NoLogs(t *testing.T) {
param := make([]map[string][]string, 1)
param[0] = make(map[string][]string)
@ -286,6 +302,8 @@ func TestEth_GetFilterChanges_NoLogs(t *testing.T) {
err := json.Unmarshal(rpcRes.Result, &ID)
require.NoError(t, err)
t.Log(ID.String())
changesRes := call(t, "eth_getFilterChanges", []string{ID.String()})
var logs []*ethtypes.Log
@ -419,7 +437,6 @@ func TestEth_GetTransactionLogs(t *testing.T) {
logs := new([]*ethtypes.Log)
err := json.Unmarshal(rpcRes.Result, logs)
require.NoError(t, err)
require.Equal(t, 1, len(*logs))
}
@ -434,7 +451,6 @@ func TestEth_GetFilterChanges_NoTopics(t *testing.T) {
param[0] = make(map[string]interface{})
param[0]["topics"] = []string{}
param[0]["fromBlock"] = res.String()
param[0]["toBlock"] = zeroString // latest
// instantiate new filter
rpcRes = call(t, "eth_newFilter", param)
@ -526,7 +542,6 @@ func TestEth_GetFilterChanges_Topics_AB(t *testing.T) {
param[0] = make(map[string]interface{})
param[0]["topics"] = []string{helloTopic, worldTopic}
param[0]["fromBlock"] = res.String()
param[0]["toBlock"] = zeroString // latest
// instantiate new filter
rpcRes = call(t, "eth_newFilter", param)
@ -557,7 +572,6 @@ func TestEth_GetFilterChanges_Topics_XB(t *testing.T) {
param[0] = make(map[string]interface{})
param[0]["topics"] = []interface{}{nil, worldTopic}
param[0]["fromBlock"] = res.String()
param[0]["toBlock"] = "0x0" // latest
// instantiate new filter
rpcRes = call(t, "eth_newFilter", param)
@ -600,7 +614,6 @@ func TestEth_GetLogs_Topics_AB(t *testing.T) {
param[0] = make(map[string]interface{})
param[0]["topics"] = []string{helloTopic, worldTopic}
param[0]["fromBlock"] = res.String()
param[0]["toBlock"] = zeroString // latest
hash := deployTestContractWithFunction(t)
waitForReceipt(t, hash)
@ -637,7 +650,6 @@ func TestEth_PendingTransactionFilter(t *testing.T) {
require.NoError(t, err, string(changesRes.Result))
require.True(t, len(txs) >= 2, "could not get any txs", "changesRes.Result", string(changesRes.Result))
}
func TestBlockBloom(t *testing.T) {