From 20e9b2ede3e24eb9739740393f99457cbd83a32c Mon Sep 17 00:00:00 2001 From: Federico Kunze <31522760+fedekunze@users.noreply.github.com> Date: Fri, 3 Jul 2020 17:40:00 +0200 Subject: [PATCH] rpc: event websocket subscription (#308) * rpc: event websocket subscription * rpc: use tendermint event subscriptions * new log events * filter evm transactions * filter logs * wip: refactor filters * remove custom BlockNumber * wip: refactor rpc * HeaderByNumber and HeaderByHash * update Tendermint event system * update Filter * update EventSystem * fix lint issues * update rpc filters * upgrade to tendermint v0.33.4 * update filters * fix unsubscription * updates wip * initialize channels * cleanup go routines * pass ResultEvent channel on subscription * error channel * add block filter changes test * add eventCh loop * pass funcs in select go func, block filter working * cleanup * lint * NewFilter and GetFilterChanges working * eth_getLogs working * lint * lint * cleanup * remove logs and minor fixes * changelog * address @noot comments * revert BlockNumber removal Co-authored-by: noot --- CHANGELOG.md | 5 +- go.mod | 10 +- go.sum | 46 ++-- rpc/apis.go | 23 +- rpc/backend.go | 135 +++++++++- rpc/eth_api.go | 10 +- rpc/filter_api.go | 580 +++++++++++++++++++++++++++++++++++++++---- rpc/filter_system.go | 433 ++++++++++++++++++++++++++++++++ rpc/filters.go | 367 ++++++++++----------------- rpc/net_api.go | 4 +- rpc/types.go | 1 + tests/rpc_test.go | 24 +- 12 files changed, 1304 insertions(+), 334 deletions(-) create mode 100644 rpc/filter_system.go diff --git a/CHANGELOG.md b/CHANGELOG.md index e6af1750..4eb12d62 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,8 +65,9 @@ Ref: https://keepachangelog.com/en/1.0.0/ ### Features -* (rpc) [\#231](https://github.com/ChainSafe/ethermint/issues/231) Implement NewBlockFilter in rpc/filters.go which instantiates a polling block filter - * Polls for new blocks via BlockNumber rpc call; if block number changes, it requests the new block via GetBlockByNumber rpc call and adds it to its internal list of blocks +* (rpc) [\#330](https://github.com/ChainSafe/ethermint/issues/330) Implement `PublicFilterAPI`'s `EventSystem` which subscribes to Tendermint events upon `Filter` creation. +* (rpc) [\#231](https://github.com/ChainSafe/ethermint/issues/231) Implement `NewBlockFilter` in rpc/filters.go which instantiates a polling block filter + * Polls for new blocks via `BlockNumber` rpc call; if block number changes, it requests the new block via `GetBlockByNumber` rpc call and adds it to its internal list of blocks * Update uninstallFilter and getFilterChanges accordingly * uninstallFilter stops the polling goroutine * getFilterChanges returns the filter's internal list of block hashes and resets it diff --git a/go.mod b/go.mod index f2de0614..a854e482 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.14 require ( github.com/allegro/bigcache v1.2.1 // indirect github.com/aristanetworks/goarista v0.0.0-20200331225509-2cc472e8fbd6 // indirect - github.com/btcsuite/btcd v0.20.1-beta // indirect github.com/cespare/cp v1.1.1 // indirect github.com/cosmos/cosmos-sdk v0.34.4-0.20200403200637-7f78e61b93a5 github.com/deckarep/golang-set v1.7.1 // indirect @@ -22,15 +21,16 @@ require ( github.com/regen-network/cosmos-proto v0.1.1-0.20200213154359-02baa11ea7c2 github.com/rjeczalik/notify v0.9.2 // indirect github.com/spf13/afero v1.2.2 // indirect - github.com/spf13/cobra v0.0.7 + github.com/spf13/cobra v1.0.0 github.com/spf13/viper v1.7.0 github.com/status-im/keycard-go v0.0.0-20190424133014-d95853db0f48 // indirect github.com/stretchr/testify v1.6.1 github.com/tendermint/go-amino v0.15.1 - github.com/tendermint/tendermint v0.33.3 + github.com/tendermint/tendermint v0.33.4 github.com/tendermint/tm-db v0.5.1 - golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4 + golang.org/x/crypto v0.0.0-20200406173513-056763e48d71 gopkg.in/yaml.v2 v2.3.0 ) -replace github.com/cosmos/cosmos-sdk => github.com/cosmos/cosmos-sdk v0.34.4-0.20200403200637-7f78e61b93a5 +// forked SDK to avoid breaking changes +replace github.com/cosmos/cosmos-sdk => github.com/Chainsafe/cosmos-sdk v0.34.4-0.20200622114457-35ea97f29c5f diff --git a/go.sum b/go.sum index 06f47c83..a5658916 100644 --- a/go.sum +++ b/go.sum @@ -31,6 +31,10 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/ChainSafe/go-schnorrkel v0.0.0-20200102211924-4bcbc698314f h1:4O1om+UVU+Hfcihr1timk8YNXHxzZWgCo7ofnrZRApw= github.com/ChainSafe/go-schnorrkel v0.0.0-20200102211924-4bcbc698314f/go.mod h1:URdX5+vg25ts3aCh8H5IFZybJYKWhJHYMTnf+ULtoC4= +github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d h1:nalkkPQcITbvhmL4+C4cKA87NW0tfm3Kl9VXRoPywFg= +github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d/go.mod h1:URdX5+vg25ts3aCh8H5IFZybJYKWhJHYMTnf+ULtoC4= +github.com/Chainsafe/cosmos-sdk v0.34.4-0.20200622114457-35ea97f29c5f h1:hLvatKcr7PZPWlwBb08oSxdfd7bN5JT0d3MKIwm3zEk= +github.com/Chainsafe/cosmos-sdk v0.34.4-0.20200622114457-35ea97f29c5f/go.mod h1:brXC4wuGawcC5pQebaWER22hzunmXFLgN8vajUh+xhE= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= @@ -42,6 +46,7 @@ github.com/VictoriaMetrics/fastcache v1.5.7 h1:4y6y0G8PRzszQUYIQHHssv/jgPHAb5qQu github.com/VictoriaMetrics/fastcache v1.5.7/go.mod h1:ptDBkNMQI4RtmVo8VS/XwRY6RoTu1dAWCbrk+6WsEM8= github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= +github.com/Workiva/go-datastructures v1.0.52 h1:PLSK6pwn8mYdaoaCZEMsXBpBotr4HHn9abU0yMQt0NI= github.com/Workiva/go-datastructures v1.0.52/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= @@ -84,6 +89,8 @@ github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufo github.com/btcsuite/btcutil v0.0.0-20180706230648-ab6388e0c60a/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d h1:yJzD/yFppdVCf6ApMkVy8cUxV0XrxdP9rVf6D87/Mng= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= +github.com/btcsuite/btcutil v1.0.2 h1:9iZ1Terx9fMIOtq1VrwdqfsATL9MC2l8ZrUY6YZ2uts= +github.com/btcsuite/btcutil v1.0.2/go.mod h1:j9HUFwoQRsZL3V4n+qG+CUnEGHOarIxfC3Le2Yhbcts= github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg= github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY= github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= @@ -114,8 +121,6 @@ github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7 github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= -github.com/cosmos/cosmos-sdk v0.34.4-0.20200403200637-7f78e61b93a5 h1:Up28KmvitVSSms5m+JZUrfYjVF27LvXZVfTb+408HaM= -github.com/cosmos/cosmos-sdk v0.34.4-0.20200403200637-7f78e61b93a5/go.mod h1:J2RTB23kBgFKwtKd7J/gk4WwG363cA/xM0GU1Gfztw4= github.com/cosmos/go-bip39 v0.0.0-20180819234021-555e2067c45d h1:49RLWk1j44Xu4fjHb6JFYmeUnDORVwHNkDxaQ0ctCVU= github.com/cosmos/go-bip39 v0.0.0-20180819234021-555e2067c45d/go.mod h1:tSxLoYXyBmiFeKpvmq4dzayMdCjCnu8uqmCysIGBT2Y= github.com/cosmos/ledger-cosmos-go v0.11.1 h1:9JIYsGnXP613pb2vPjFeMMjBI5lEDsEaF6oYorTy6J4= @@ -228,8 +233,9 @@ github.com/golang/protobuf v1.3.4/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaW github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= -github.com/golang/protobuf v1.4.0-rc.4 h1:+EOh4OY6tjM6ZueeUKinl1f0U2820HzQOuf1iqMnsks= -github.com/golang/protobuf v1.4.0-rc.4/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0 h1:oOuy+ugB+P/kBdUnG5QaMXSIyJ1q38wWSojYCb3z5VQ= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -277,6 +283,8 @@ github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8 github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= github.com/gtank/merlin v0.1.1-0.20191105220539-8318aed1a79f h1:8N8XWLZelZNibkhM1FuF+3Ad3YIbgirjdMiVA0eUkaM= github.com/gtank/merlin v0.1.1-0.20191105220539-8318aed1a79f/go.mod h1:T86dnYJhcGOh5BjZFCJWTDeTK7XW8uE+E21Cy/bIQ+s= +github.com/gtank/merlin v0.1.1 h1:eQ90iG7K9pOhtereWsmyRJ6RAwcP4tHTDBHXNg+u5is= +github.com/gtank/merlin v0.1.1/go.mod h1:T86dnYJhcGOh5BjZFCJWTDeTK7XW8uE+E21Cy/bIQ+s= github.com/gtank/ristretto255 v0.1.2 h1:JEqUCPA1NvLq5DwYtuzigd7ss8fwbYay9fi4/5uMzcc= github.com/gtank/ristretto255 v0.1.2/go.mod h1:Ph5OpO6c7xKUGROZfWVLiJf9icMDwUeIvY4OmlYW69o= github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q= @@ -472,6 +480,8 @@ github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeD github.com/prometheus/client_golang v1.4.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_golang v1.5.0 h1:Ctq0iGpCmr3jeP77kbF2UxgvRwzWWz+4Bh9/vJTyg1A= github.com/prometheus/client_golang v1.5.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= +github.com/prometheus/client_golang v1.5.1 h1:bdHYieyGlH+6OLEk2YQha8THib30KP0/yD0YH9m6xcA= +github.com/prometheus/client_golang v1.5.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -502,6 +512,8 @@ github.com/rakyll/statik v0.1.7/go.mod h1:AlZONWzMtEnMs7W4e/1LURLiI49pIMmp6V9Ung github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ= +github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/regen-network/cosmos-proto v0.1.1-0.20200213154359-02baa11ea7c2 h1:jQK1YoH972Aptd22YKgtNu5jM2X2xMGkyIENOAc71to= github.com/regen-network/cosmos-proto v0.1.1-0.20200213154359-02baa11ea7c2/go.mod h1:+r7jN10xXCypD4yBgzKOa+vgLz0riqYMHeDcKekxPvA= github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho= @@ -539,8 +551,8 @@ github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/cobra v0.0.6/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE= -github.com/spf13/cobra v0.0.7 h1:FfTH+vuMXOas8jmfb5/M7dzEYx7LpcLb7a0LPe34uOU= -github.com/spf13/cobra v0.0.7/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE= +github.com/spf13/cobra v1.0.0 h1:6m/oheQuQ13N9ks4hubMG6BnvwOeaJrqSPLahSnczz8= +github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo= @@ -550,6 +562,7 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/spf13/viper v1.6.2/go.mod h1:t3iDnF5Jlj76alVNuyFBk5oUMCvsrkbvZK0WQdfDi5k= +github.com/spf13/viper v1.6.3/go.mod h1:jUMtyi0/lB5yZH/FjyGAoH7IMNrIhlBf6pXZmbMDvzw= github.com/spf13/viper v1.7.0 h1:xVKxvI7ouOI5I+U9s2eeiUfMaWBVoXA3AWskkrqK0VM= github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= @@ -571,8 +584,6 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.0 h1:jlIyCplCJFULU/01vCkhKuTyc3OorI3bJFuw6obfgho= -github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= @@ -590,13 +601,12 @@ github.com/tendermint/crypto v0.0.0-20191022145703-50d29ede1e15/go.mod h1:z4YtwM github.com/tendermint/go-amino v0.14.1/go.mod h1:i/UKE5Uocn+argJJBb12qTZsCDBcAYMbR92AaJVmKso= github.com/tendermint/go-amino v0.15.1 h1:D2uk35eT4iTsvJd9jWIetzthE5C0/k2QmMFkCN+4JgQ= github.com/tendermint/go-amino v0.15.1/go.mod h1:TQU0M1i/ImAo+tYpZi73AU3V/dKeCoMC9Sphe2ZwGME= -github.com/tendermint/iavl v0.13.2 h1:O1m08/Ciy53l9IYmf75uIRVvrNsfjEbre8u/yCu/oqk= -github.com/tendermint/iavl v0.13.2/go.mod h1:vE1u0XAGXYjHykd4BLp8p/yivrw2PF1TuoljBcsQoGA= +github.com/tendermint/iavl v0.13.3 h1:expgBDY1MX+6/3sqrIxGChbTNf9N9aTJ67SH4bPchCs= +github.com/tendermint/iavl v0.13.3/go.mod h1:2lE7GiWdSvc7kvT78ncIKmkOjCnp6JEnSb2O7B9htLw= github.com/tendermint/tendermint v0.33.2/go.mod h1:25DqB7YvV1tN3tHsjWoc2vFtlwICfrub9XO6UBO+4xk= -github.com/tendermint/tendermint v0.33.3 h1:6lMqjEoCGejCzAghbvfQgmw87snGSqEhDTo/jw+W8CI= -github.com/tendermint/tendermint v0.33.3/go.mod h1:25DqB7YvV1tN3tHsjWoc2vFtlwICfrub9XO6UBO+4xk= +github.com/tendermint/tendermint v0.33.4 h1:NM3G9618yC5PaaxGrcAySc5ylc1PAANeIx42u2Re/jo= +github.com/tendermint/tendermint v0.33.4/go.mod h1:6NW9DVkvsvqmCY6wbRsOo66qGDhMXglRL70aXajvBEA= github.com/tendermint/tm-db v0.4.1/go.mod h1:JsJ6qzYkCGiGwm5GHl/H5GLI9XLb6qZX7PRe425dHAY= -github.com/tendermint/tm-db v0.5.0/go.mod h1:lSq7q5WRR/njf1LnhiZ/lIJHk2S8Y1Zyq5oP/3o9C2U= github.com/tendermint/tm-db v0.5.1 h1:H9HDq8UEA7Eeg13kdYckkgwwkQLBnJGgX4PgLJRhieY= github.com/tendermint/tm-db v0.5.1/go.mod h1:g92zWjHpCYlEvQXvy9M168Su8V1IBEeawpXVVBaK4f4= github.com/tjfoc/gmsm v1.3.0/go.mod h1:HaUcFuY0auTiaHB9MHFGCPx5IaLhTUd2atbCFBQXn9w= @@ -644,10 +654,13 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20191219195013-becbf705a915/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4 h1:QmwruyY+bKbDDL0BaglrbZABEali68eoMFhTZpCjYVA= golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200406173513-056763e48d71 h1:DOmugCavvUtnUD114C1Wh+UgTgQZ4pMLzXxi1pSt+/Y= +golang.org/x/crypto v0.0.0-20200406173513-056763e48d71/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -690,6 +703,7 @@ golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U= golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -804,12 +818,14 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.28.0 h1:bO/TA4OxCOummhSf10siHuG7vJOiwh7SpRpFZDkOgl4= google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= +google.golang.org/grpc v1.28.1 h1:C1QC6KzgSiLyBabDi87BbjaGreoRgGUF5nOyvfrAZ1k= +google.golang.org/grpc v1.28.1/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= -google.golang.org/protobuf v1.20.1 h1:ESRXHgpUBG5D2I5mmsQIyYxB/tQIZfSZ8wLyFDf/N/U= -google.golang.org/protobuf v1.20.1/go.mod h1:KqelGeouBkcbcuB3HCk4/YH2tmNLk6YSWA5LIWeI/lY= +google.golang.org/protobuf v1.21.0 h1:qdOKuR/EIArgaWNjetjgTzgVTAZ+S/WXVrq9HW9zimw= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/bsm/ratelimit.v1 v1.0.0-20160220154919-db14e161995a/go.mod h1:KF9sEfUPAXdG8Oev9e99iLGnl2uJMjc5B+4y3O7x610= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/rpc/apis.go b/rpc/apis.go index 24d5f95d..d6bb2714 100644 --- a/rpc/apis.go +++ b/rpc/apis.go @@ -10,10 +10,15 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) -const Web3Namespace = "web3" -const EthNamespace = "eth" -const PersonalNamespace = "personal" -const NetNamespace = "net" +// RPC namespaces and API version +const ( + Web3Namespace = "web3" + EthNamespace = "eth" + PersonalNamespace = "personal" + NetNamespace = "net" + + apiVersion = "1.0" +) // GetRPCAPIs returns the list of all APIs func GetRPCAPIs(cliCtx context.CLIContext, key emintcrypto.PrivKeySecp256k1) []rpc.API { @@ -22,31 +27,31 @@ func GetRPCAPIs(cliCtx context.CLIContext, key emintcrypto.PrivKeySecp256k1) []r return []rpc.API{ { Namespace: Web3Namespace, - Version: "1.0", + Version: apiVersion, Service: NewPublicWeb3API(), Public: true, }, { Namespace: EthNamespace, - Version: "1.0", + Version: apiVersion, Service: NewPublicEthAPI(cliCtx, backend, nonceLock, key), Public: true, }, { Namespace: PersonalNamespace, - Version: "1.0", + Version: apiVersion, Service: NewPersonalEthAPI(cliCtx, nonceLock), Public: false, }, { Namespace: EthNamespace, - Version: "1.0", + Version: apiVersion, Service: NewPublicFilterAPI(cliCtx, backend), Public: true, }, { Namespace: NetNamespace, - Version: "1.0", + Version: apiVersion, Service: NewPublicNetAPI(cliCtx), Public: true, }, diff --git a/rpc/backend.go b/rpc/backend.go index 21dae182..b0accbc9 100644 --- a/rpc/backend.go +++ b/rpc/backend.go @@ -5,6 +5,8 @@ import ( "math/big" "strconv" + tmtypes "github.com/tendermint/tendermint/types" + evmtypes "github.com/cosmos/ethermint/x/evm/types" "github.com/cosmos/cosmos-sdk/client/context" @@ -19,20 +21,26 @@ import ( type Backend interface { // Used by block filter; also used for polling BlockNumber() (hexutil.Uint64, error) + HeaderByNumber(blockNum BlockNumber) (*ethtypes.Header, error) + HeaderByHash(blockHash common.Hash) (*ethtypes.Header, error) GetBlockByNumber(blockNum BlockNumber, fullTx bool) (map[string]interface{}, error) GetBlockByHash(hash common.Hash, fullTx bool) (map[string]interface{}, error) getEthBlockByNumber(height int64, fullTx bool) (map[string]interface{}, error) getGasLimit() (int64, error) + // returns the logs of a given block + GetLogs(blockHash common.Hash) ([][]*ethtypes.Log, error) // Used by pending transaction filter PendingTransactions() ([]*Transaction, error) // Used by log filter GetTransactionLogs(txHash common.Hash) ([]*ethtypes.Log, error) - // TODO: Bloom methods + BloomStatus() (uint64, uint64) } -// EthermintBackend implements Backend +var _ Backend = (*EthermintBackend)(nil) + +// EthermintBackend implements the Backend interface type EthermintBackend struct { cliCtx context.CLIContext gasLimit int64 @@ -68,7 +76,7 @@ func (e *EthermintBackend) GetBlockByNumber(blockNum BlockNumber, fullTx bool) ( // GetBlockByHash returns the block identified by hash. func (e *EthermintBackend) GetBlockByHash(hash common.Hash, fullTx bool) (map[string]interface{}, error) { - res, _, err := e.cliCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryHashToHeight, hash.Hex())) + res, height, err := e.cliCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryHashToHeight, hash.Hex())) if err != nil { return nil, err } @@ -78,9 +86,60 @@ func (e *EthermintBackend) GetBlockByHash(hash common.Hash, fullTx bool) (map[st return nil, err } + e.cliCtx = e.cliCtx.WithHeight(height) return e.getEthBlockByNumber(out.Number, fullTx) } +// HeaderByNumber returns the block header identified by height. +func (e *EthermintBackend) HeaderByNumber(blockNum BlockNumber) (*ethtypes.Header, error) { + return e.getBlockHeader(blockNum.Int64()) +} + +// HeaderByHash returns the block header identified by hash. +func (e *EthermintBackend) HeaderByHash(blockHash common.Hash) (*ethtypes.Header, error) { + res, height, err := e.cliCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryHashToHeight, blockHash.Hex())) + if err != nil { + return nil, err + } + var out evmtypes.QueryResBlockNumber + if err := e.cliCtx.Codec.UnmarshalJSON(res, &out); err != nil { + return nil, err + } + + e.cliCtx = e.cliCtx.WithHeight(height) + return e.getBlockHeader(out.Number) +} + +func (e *EthermintBackend) getBlockHeader(height int64) (*ethtypes.Header, error) { + if height <= 0 { + // get latest block height + num, err := e.BlockNumber() + if err != nil { + return nil, err + } + + height = int64(num) + } + + block, err := e.cliCtx.Client.Block(&height) + if err != nil { + return nil, err + } + + res, _, err := e.cliCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryBloom, strconv.FormatInt(height, 10))) + if err != nil { + return nil, err + } + + var bloomRes evmtypes.QueryBloomFilter + e.cliCtx.Codec.MustUnmarshalJSON(res, &bloomRes) + + ethHeader := EthHeaderFromTendermint(block.Block.Header) + ethHeader.Bloom = bloomRes.Bloom + + return ethHeader, nil +} + func (e *EthermintBackend) getEthBlockByNumber(height int64, fullTx bool) (map[string]interface{}, error) { // Remove this check when 0 query is fixed ref: (https://github.com/tendermint/tendermint/issues/4014) var blkNumPtr *int64 @@ -128,7 +187,6 @@ func (e *EthermintBackend) getEthBlockByNumber(height int64, fullTx bool) (map[s var out evmtypes.QueryBloomFilter e.cliCtx.Codec.MustUnmarshalJSON(res, &out) - return formatBlock(header, block.Block.Size(), gasLimit, gasUsed, transactions, out.Bloom), nil } @@ -158,11 +216,12 @@ func (e *EthermintBackend) getGasLimit() (int64, error) { } // GetTransactionLogs returns the logs given a transaction hash. +// It returns an error if there's an encoding error. +// If no logs are found for the tx hash, the error is nil. func (e *EthermintBackend) GetTransactionLogs(txHash common.Hash) ([]*ethtypes.Log, error) { - // do we need to use the block height somewhere? ctx := e.cliCtx - res, _, err := ctx.QueryWithData(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryTransactionLogs, txHash.Hex()), nil) + res, height, err := ctx.QueryWithData(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryTransactionLogs, txHash.Hex()), nil) if err != nil { return nil, err } @@ -172,6 +231,7 @@ func (e *EthermintBackend) GetTransactionLogs(txHash common.Hash) ([]*ethtypes.L return nil, err } + e.cliCtx = e.cliCtx.WithHeight(height) return out.Logs, nil } @@ -183,7 +243,7 @@ func (e *EthermintBackend) PendingTransactions() ([]*Transaction, error) { return nil, err } - transactions := make([]*Transaction, 0, 100) + transactions := make([]*Transaction, pendingTxs.Count) for _, tx := range pendingTxs.Txs { ethTx, err := bytesToEthTx(e.cliCtx, tx) if err != nil { @@ -201,3 +261,64 @@ func (e *EthermintBackend) PendingTransactions() ([]*Transaction, error) { return transactions, nil } + +// GetLogs returns all the logs from all the ethreum transactions in a block. +func (e *EthermintBackend) GetLogs(blockHash common.Hash) ([][]*ethtypes.Log, error) { + res, _, err := e.cliCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryHashToHeight, blockHash.Hex())) + if err != nil { + return nil, err + } + + var out evmtypes.QueryResBlockNumber + if err := e.cliCtx.Codec.UnmarshalJSON(res, &out); err != nil { + return nil, err + } + + block, err := e.cliCtx.Client.Block(&out.Number) + if err != nil { + return nil, err + } + + var blockLogs = [][]*ethtypes.Log{} + for _, tx := range block.Block.Txs { + // NOTE: we query the state in case the tx result logs are not persisted after an upgrade. + res, _, err := e.cliCtx.QueryWithData(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryTransactionLogs, common.BytesToHash(tx.Hash()).Hex()), nil) + if err != nil { + continue + } + + out := new(evmtypes.QueryETHLogs) + if err := e.cliCtx.Codec.UnmarshalJSON(res, &out); err != nil { + return nil, err + } + + blockLogs = append(blockLogs, out.Logs) + } + + return blockLogs, nil +} + +// BloomStatus returns the BloomBitsBlocks and the number of processed sections maintained +// by the chain indexer. +func (e *EthermintBackend) BloomStatus() (uint64, uint64) { + return 4096, 0 +} + +// EthHeaderFromTendermint is an util function that returns an Ethereum Header +// from a tendermint Header. +func EthHeaderFromTendermint(header tmtypes.Header) *ethtypes.Header { + return ðtypes.Header{ + ParentHash: common.BytesToHash(header.LastBlockID.Hash.Bytes()), + UncleHash: common.Hash{}, + Coinbase: common.Address{}, + Root: common.BytesToHash(header.AppHash), + TxHash: common.BytesToHash(header.DataHash), + ReceiptHash: common.Hash{}, + Difficulty: nil, + Number: big.NewInt(header.Height), + Time: uint64(header.Time.Unix()), + Extra: nil, + MixDigest: common.Hash{}, + Nonce: ethtypes.BlockNonce{}, + } +} diff --git a/rpc/eth_api.go b/rpc/eth_api.go index f895d03b..8dbc2932 100644 --- a/rpc/eth_api.go +++ b/rpc/eth_api.go @@ -30,7 +30,6 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/rpc" "github.com/cosmos/cosmos-sdk/client/context" "github.com/cosmos/cosmos-sdk/client/flags" @@ -388,7 +387,7 @@ type CallArgs struct { } // Call performs a raw contract call. -func (e *PublicEthAPI) Call(args CallArgs, blockNr rpc.BlockNumber, overrides *map[common.Address]account) (hexutil.Bytes, error) { +func (e *PublicEthAPI) Call(args CallArgs, blockNr BlockNumber, overrides *map[common.Address]account) (hexutil.Bytes, error) { simRes, err := e.doCall(args, blockNr, big.NewInt(emint.DefaultRPCGasLimit)) if err != nil { return []byte{}, err @@ -419,7 +418,7 @@ type account struct { // DoCall performs a simulated call operation through the evmtypes. It returns the // estimated gas used on the operation or an error if fails. func (e *PublicEthAPI) doCall( - args CallArgs, blockNr rpc.BlockNumber, globalGasCap *big.Int, + args CallArgs, blockNr BlockNumber, globalGasCap *big.Int, ) (*sdk.SimulationResponse, error) { // Set height for historical queries ctx := e.cliCtx @@ -561,7 +560,8 @@ func convertTransactionsToRPC(cliCtx context.CLIContext, txs []tmtypes.Tx, block for i, tx := range txs { ethTx, err := bytesToEthTx(cliCtx, tx) if err != nil { - return nil, nil, err + // continue to next transaction in case it's not a MsgEthereumTx + continue } // TODO: Remove gas usage calculation if saving gasUsed per block gasUsed.Add(gasUsed, ethTx.Fee()) @@ -602,7 +602,7 @@ func bytesToEthTx(cliCtx context.CLIContext, bz []byte) (*evmtypes.MsgEthereumTx ethTx, ok := stdTx.(evmtypes.MsgEthereumTx) if !ok { - return nil, fmt.Errorf("invalid transaction type, must be an amino encoded Ethereum transaction") + return nil, fmt.Errorf("invalid transaction type %T, expected MsgEthereumTx", stdTx) } return ðTx, nil } diff --git a/rpc/filter_api.go b/rpc/filter_api.go index 8a771146..6d2fdc84 100644 --- a/rpc/filter_api.go +++ b/rpc/filter_api.go @@ -1,79 +1,553 @@ package rpc import ( - "errors" + "context" + "fmt" + "sync" + "time" - "github.com/cosmos/cosmos-sdk/client/context" + coretypes "github.com/tendermint/tendermint/rpc/core/types" + tmtypes "github.com/tendermint/tendermint/types" + "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/rpc" + + clientcontext "github.com/cosmos/cosmos-sdk/client/context" + + evmtypes "github.com/cosmos/ethermint/x/evm/types" ) -// PublicFilterAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec. -type PublicFilterAPI struct { - cliCtx context.CLIContext - backend Backend - filters map[rpc.ID]*Filter // ID to filter; TODO: change to sync.Map in case of concurrent writes +// FiltersBackend defines the methods requided by the PublicFilterAPI backend +type FiltersBackend interface { + GetBlockByNumber(blockNum BlockNumber, fullTx bool) (map[string]interface{}, error) + HeaderByNumber(blockNr BlockNumber) (*ethtypes.Header, error) + HeaderByHash(blockHash common.Hash) (*ethtypes.Header, error) + GetLogs(blockHash common.Hash) ([][]*ethtypes.Log, error) + + GetTransactionLogs(txHash common.Hash) ([]*ethtypes.Log, error) + BloomStatus() (uint64, uint64) } -// NewPublicEthAPI creates an instance of the public ETH Web3 API. -func NewPublicFilterAPI(cliCtx context.CLIContext, backend Backend) *PublicFilterAPI { - return &PublicFilterAPI{ +// consider a filter inactive if it has not been polled for within deadline +var deadline = 5 * time.Minute + +// filter is a helper struct that holds meta information over the filter type +// and associated subscription in the event system. +type filter struct { + typ filters.Type + deadline *time.Timer // filter is inactive when deadline triggers + hashes []common.Hash + crit filters.FilterCriteria + logs []*ethtypes.Log + s *Subscription // associated subscription in event system +} + +// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various +// information related to the Ethereum protocol such as blocks, transactions and logs. +type PublicFilterAPI struct { + cliCtx clientcontext.CLIContext + backend FiltersBackend + events *EventSystem + filtersMu sync.Mutex + filters map[rpc.ID]*filter +} + +// NewPublicFilterAPI returns a new PublicFilterAPI instance. +func NewPublicFilterAPI(cliCtx clientcontext.CLIContext, backend FiltersBackend) *PublicFilterAPI { + // start the client to subscribe to Tendermint events + err := cliCtx.Client.Start() + if err != nil { + panic(err) + } + + api := &PublicFilterAPI{ cliCtx: cliCtx, backend: backend, - filters: make(map[rpc.ID]*Filter), + filters: make(map[rpc.ID]*filter), + events: NewEventSystem(cliCtx.Client), + } + + go api.timeoutLoop() + + return api +} + +// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used. +// Tt is started when the api is created. +func (api *PublicFilterAPI) timeoutLoop() { + ticker := time.NewTicker(deadline) + defer ticker.Stop() + + for { + <-ticker.C + api.filtersMu.Lock() + for id, f := range api.filters { + select { + case <-f.deadline.C: + f.s.Unsubscribe(api.events) + delete(api.filters, id) + default: + continue + } + } + api.filtersMu.Unlock() } } -// NewFilter instantiates a new filter. -func (e *PublicFilterAPI) NewFilter(criteria filters.FilterCriteria) rpc.ID { - id := rpc.NewID() - e.filters[id] = NewFilter(e.backend, &criteria) - return id -} - -// NewBlockFilter instantiates a new block filter. -func (e *PublicFilterAPI) NewBlockFilter() rpc.ID { - id := rpc.NewID() - e.filters[id] = NewBlockFilter(e.backend) - return id -} - -// NewPendingTransactionFilter instantiates a new pending transaction filter. -func (e *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { - id := rpc.NewID() - e.filters[id] = NewPendingTransactionFilter(e.backend) - return id -} - -// UninstallFilter uninstalls a filter with the given ID. -func (e *PublicFilterAPI) UninstallFilter(id rpc.ID) bool { - e.filters[id].uninstallFilter() - delete(e.filters, id) - return true -} - -// GetFilterChanges returns an array of changes since the last poll. -// If the filter is a log filter, it returns an array of Logs. -// If the filter is a block filter, it returns an array of block hashes. -// If the filter is a pending transaction filter, it returns an array of transaction hashes. -func (e *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { - if e.filters[id] == nil { - return nil, errors.New("invalid filter ID") +// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes +// as transactions enter the pending state. +// +// It is part of the filter package because this filter can be used through the +// `eth_getFilterChanges` polling method that is also used for log filters. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newPendingTransactionFilter +func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { + pendingTxSub, cancelSubs, err := api.events.SubscribePendingTxs() + if err != nil { + // wrap error on the ID + return rpc.ID(fmt.Sprintf("error creating pending tx filter: %s", err.Error())) } - return e.filters[id].getFilterChanges() + + api.filtersMu.Lock() + api.filters[pendingTxSub.ID()] = &filter{typ: filters.PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub} + api.filtersMu.Unlock() + + go func(txsCh <-chan coretypes.ResultEvent, errCh <-chan error) { + defer cancelSubs() + + for { + select { + case ev := <-txsCh: + data, _ := ev.Data.(tmtypes.EventDataTx) + txHash := common.BytesToHash(data.Tx.Hash()) + + api.filtersMu.Lock() + if f, found := api.filters[pendingTxSub.ID()]; found { + f.hashes = append(f.hashes, txHash) + } + api.filtersMu.Unlock() + case <-errCh: + api.filtersMu.Lock() + delete(api.filters, pendingTxSub.ID()) + api.filtersMu.Unlock() + } + } + }(pendingTxSub.eventCh, pendingTxSub.Err()) + + return pendingTxSub.ID() } -// GetFilterLogs returns an array of all logs matching filter with given id. -func (e *PublicFilterAPI) GetFilterLogs(id rpc.ID) ([]*ethtypes.Log, error) { - return e.filters[id].getFilterLogs() +// NewPendingTransactions creates a subscription that is triggered each time a transaction +// enters the transaction pool and was signed from one of the transactions this nodes manages. +func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + ctx, cancelFn := context.WithTimeout(context.Background(), deadline) + defer cancelFn() + + api.events.WithContext(ctx) + + pendingTxSub, cancelSubs, err := api.events.SubscribePendingTxs() + if err != nil { + return nil, err + } + + go func(txsCh <-chan coretypes.ResultEvent) { + defer cancelSubs() + + for { + select { + case ev := <-txsCh: + data, _ := ev.Data.(tmtypes.EventDataTx) + txHash := common.BytesToHash(data.Tx.Hash()) + + // To keep the original behaviour, send a single tx hash in one notification. + // TODO(rjl493456442) Send a batch of tx hashes in one notification + err = notifier.Notify(rpcSub.ID, txHash) + if err != nil { + return + } + case <-rpcSub.Err(): + pendingTxSub.Unsubscribe(api.events) + return + case <-notifier.Closed(): + pendingTxSub.Unsubscribe(api.events) + return + } + } + }(pendingTxSub.eventCh) + + return rpcSub, err +} + +// NewBlockFilter creates a filter that fetches blocks that are imported into the chain. +// It is part of the filter package since polling goes with eth_getFilterChanges. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter +func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { + headerSub, cancelSubs, err := api.events.SubscribeNewHeads() + if err != nil { + // wrap error on the ID + return rpc.ID(fmt.Sprintf("error creating block filter: %s", err.Error())) + } + + api.filtersMu.Lock() + api.filters[headerSub.ID()] = &filter{typ: filters.BlocksSubscription, deadline: time.NewTimer(deadline), hashes: []common.Hash{}, s: headerSub} + api.filtersMu.Unlock() + + go func(headersCh <-chan coretypes.ResultEvent, errCh <-chan error) { + defer cancelSubs() + + for { + select { + case ev := <-headersCh: + data, _ := ev.Data.(tmtypes.EventDataNewBlockHeader) + header := EthHeaderFromTendermint(data.Header) + api.filtersMu.Lock() + if f, found := api.filters[headerSub.ID()]; found { + f.hashes = append(f.hashes, header.Hash()) + } + api.filtersMu.Unlock() + case <-errCh: + api.filtersMu.Lock() + delete(api.filters, headerSub.ID()) + api.filtersMu.Unlock() + return + } + } + }(headerSub.eventCh, headerSub.Err()) + + return headerSub.ID() +} + +// NewHeads send a notification each time a new (header) block is appended to the chain. +func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + api.events.WithContext(ctx) + rpcSub := notifier.CreateSubscription() + + headersSub, cancelSubs, err := api.events.SubscribeNewHeads() + if err != nil { + return &rpc.Subscription{}, err + } + + go func(headersCh <-chan coretypes.ResultEvent) { + defer cancelSubs() + + for { + select { + case ev := <-headersCh: + data, ok := ev.Data.(tmtypes.EventDataNewBlockHeader) + if !ok { + err = fmt.Errorf("invalid event data %T, expected %s", ev.Data, tmtypes.EventNewBlockHeader) + headersSub.err <- err + return + } + + header := EthHeaderFromTendermint(data.Header) + err = notifier.Notify(rpcSub.ID, header) + if err != nil { + headersSub.err <- err + return + } + case <-rpcSub.Err(): + headersSub.Unsubscribe(api.events) + return + case <-notifier.Closed(): + headersSub.Unsubscribe(api.events) + return + } + } + }(headersSub.eventCh) + + return rpcSub, err +} + +// Logs creates a subscription that fires for all new log that match the given filter criteria. +func (api *PublicFilterAPI) Logs(ctx context.Context, crit filters.FilterCriteria) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + api.events.WithContext(ctx) + rpcSub := notifier.CreateSubscription() + + logsSub, cancelSubs, err := api.events.SubscribeLogs(crit) + if err != nil { + return &rpc.Subscription{}, err + } + + go func(logsCh <-chan coretypes.ResultEvent) { + defer cancelSubs() + + for { + select { + case event := <-logsCh: + // filter only events from EVM module txs + _, isMsgEthermint := event.Events[evmtypes.TypeMsgEthermint] + _, isMsgEthereumTx := event.Events[evmtypes.TypeMsgEthereumTx] + + if !(isMsgEthermint || isMsgEthereumTx) { + // ignore transaction as it's not from the evm module + return + } + + // get transaction result data + dataTx, ok := event.Data.(tmtypes.EventDataTx) + if !ok { + err = fmt.Errorf("invalid event data %T, expected %s", event.Data, tmtypes.EventTx) + logsSub.err <- err + return + } + + resultData, err := evmtypes.DecodeResultData(dataTx.TxResult.Result.Data) + if err != nil { + return + } + + logs := filterLogs(resultData.Logs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) + + for _, log := range logs { + err = notifier.Notify(rpcSub.ID, log) + if err != nil { + return + } + } + case <-rpcSub.Err(): // client send an unsubscribe request + logsSub.Unsubscribe(api.events) + return + case <-notifier.Closed(): // connection dropped + logsSub.Unsubscribe(api.events) + return + } + } + }(logsSub.eventCh) + + return rpcSub, err +} + +// NewFilter creates a new filter and returns the filter id. It can be +// used to retrieve logs when the state changes. This method cannot be +// used to fetch logs that are already stored in the state. +// +// Default criteria for the from and to block are "latest". +// Using "latest" as block number will return logs for mined blocks. +// Using "pending" as block number returns logs for not yet mined (pending) blocks. +// In case logs are removed (chain reorg) previously returned logs are returned +// again but with the removed property set to true. +// +// In case "fromBlock" > "toBlock" an error is returned. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter +func (api *PublicFilterAPI) NewFilter(criteria filters.FilterCriteria) (rpc.ID, error) { + var ( + filterID = rpc.ID("") + err error + ) + + logsSub, cancelSubs, err := api.events.SubscribeLogs(criteria) + if err != nil { + return rpc.ID(""), err + } + + filterID = logsSub.ID() + + api.filtersMu.Lock() + api.filters[filterID] = &filter{typ: filters.LogsSubscription, deadline: time.NewTimer(deadline), hashes: []common.Hash{}, s: logsSub} + api.filtersMu.Unlock() + + go func(eventCh <-chan coretypes.ResultEvent) { + defer cancelSubs() + + for { + select { + case event := <-eventCh: + dataTx, ok := event.Data.(tmtypes.EventDataTx) + if !ok { + err = fmt.Errorf("invalid event data %T, expected EventDataTx", event.Data) + return + } + + var resultData evmtypes.ResultData + resultData, err = evmtypes.DecodeResultData(dataTx.TxResult.Result.Data) + if err != nil { + return + } + + logs := filterLogs(resultData.Logs, criteria.FromBlock, criteria.ToBlock, criteria.Addresses, criteria.Topics) + + api.filtersMu.Lock() + if f, found := api.filters[filterID]; found { + f.logs = append(f.logs, logs...) + } + api.filtersMu.Unlock() + case <-logsSub.Err(): + api.filtersMu.Lock() + delete(api.filters, filterID) + api.filtersMu.Unlock() + return + } + } + }(logsSub.eventCh) + + return filterID, err } // GetLogs returns logs matching the given argument that are stored within the state. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs -func (e *PublicFilterAPI) GetLogs(criteria filters.FilterCriteria) ([]*ethtypes.Log, error) { - filter := NewFilter(e.backend, &criteria) - return filter.getFilterLogs() +func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit filters.FilterCriteria) ([]*ethtypes.Log, error) { + var filter *Filter + if crit.BlockHash != nil { + // Block filter requested, construct a single-shot filter + filter = NewBlockFilter(api.backend, crit) + } else { + // Convert the RPC block numbers into internal representations + begin := rpc.LatestBlockNumber.Int64() + if crit.FromBlock != nil { + begin = crit.FromBlock.Int64() + } + end := rpc.LatestBlockNumber.Int64() + if crit.ToBlock != nil { + end = crit.ToBlock.Int64() + } + // Construct the range filter + filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics) + } + + // Run the filter and return all the logs + logs, err := filter.Logs(ctx) + if err != nil { + return nil, err + } + + return returnLogs(logs), err +} + +// UninstallFilter removes the filter with the given filter id. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter +func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool { + api.filtersMu.Lock() + f, found := api.filters[id] + if found { + delete(api.filters, id) + } + api.filtersMu.Unlock() + + if !found { + return false + } + f.s.Unsubscribe(api.events) + return true +} + +// GetFilterLogs returns the logs for the filter with the given id. +// If the filter could not be found an empty array of logs is returned. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs +func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ethtypes.Log, error) { + api.filtersMu.Lock() + f, found := api.filters[id] + api.filtersMu.Unlock() + + if !found { + return returnLogs(nil), fmt.Errorf("filter %s not found", id) + } + + if f.typ != filters.LogsSubscription { + return returnLogs(nil), fmt.Errorf("filter %s doesn't have a LogsSubscription type: got %d", id, f.typ) + } + + var filter *Filter + if f.crit.BlockHash != nil { + // Block filter requested, construct a single-shot filter + filter = NewBlockFilter(api.backend, f.crit) + } else { + // Convert the RPC block numbers into internal representations + begin := rpc.LatestBlockNumber.Int64() + if f.crit.FromBlock != nil { + begin = f.crit.FromBlock.Int64() + } + end := rpc.LatestBlockNumber.Int64() + if f.crit.ToBlock != nil { + end = f.crit.ToBlock.Int64() + } + // Construct the range filter + filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics) + } + // Run the filter and return all the logs + logs, err := filter.Logs(ctx) + if err != nil { + return nil, err + } + return returnLogs(logs), nil +} + +// GetFilterChanges returns the logs for the filter with the given id since +// last time it was called. This can be used for polling. +// +// For pending transaction and block filters the result is []common.Hash. +// (pending)Log filters return []Log. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges +func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { + api.filtersMu.Lock() + defer api.filtersMu.Unlock() + + f, found := api.filters[id] + if !found { + return nil, fmt.Errorf("filter %s not found", id) + } + + if !f.deadline.Stop() { + // timer expired but filter is not yet removed in timeout loop + // receive timer value and reset timer + <-f.deadline.C + } + f.deadline.Reset(deadline) + + switch f.typ { + case filters.PendingTransactionsSubscription, filters.BlocksSubscription: + hashes := f.hashes + f.hashes = nil + return returnHashes(hashes), nil + case filters.LogsSubscription, filters.MinedAndPendingLogsSubscription: + logs := make([]*ethtypes.Log, len(f.logs)) + copy(logs, f.logs) + f.logs = []*ethtypes.Log{} + return returnLogs(logs), nil + default: + return nil, fmt.Errorf("invalid filter %s type %d", id, f.typ) + } +} + +// returnHashes is a helper that will return an empty hash array case the given hash array is nil, +// otherwise the given hashes array is returned. +func returnHashes(hashes []common.Hash) []common.Hash { + if hashes == nil { + return []common.Hash{} + } + return hashes +} + +// returnLogs is a helper that will return an empty log array in case the given logs array is nil, +// otherwise the given logs array is returned. +func returnLogs(logs []*ethtypes.Log) []*ethtypes.Log { + if logs == nil { + return []*ethtypes.Log{} + } + return logs } diff --git a/rpc/filter_system.go b/rpc/filter_system.go new file mode 100644 index 00000000..00e7aa96 --- /dev/null +++ b/rpc/filter_system.go @@ -0,0 +1,433 @@ +package rpc + +import ( + "context" + "fmt" + "log" + "time" + + tmquery "github.com/tendermint/tendermint/libs/pubsub/query" + rpcclient "github.com/tendermint/tendermint/rpc/client" + coretypes "github.com/tendermint/tendermint/rpc/core/types" + tmtypes "github.com/tendermint/tendermint/types" + + "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/filters" + "github.com/ethereum/go-ethereum/rpc" + + sdk "github.com/cosmos/cosmos-sdk/types" + + evmtypes "github.com/cosmos/ethermint/x/evm/types" +) + +var ( + txEvents = tmtypes.QueryForEvent(tmtypes.EventTx).String() + evmEvents = tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s.%s='%s'", tmtypes.EventTypeKey, tmtypes.EventTx, sdk.EventTypeMessage, sdk.AttributeKeyModule, evmtypes.ModuleName)).String() + headerEvents = tmtypes.QueryForEvent(tmtypes.EventNewBlockHeader).String() +) + +// EventSystem creates subscriptions, processes events and broadcasts them to the +// subscription which match the subscription criteria using the Tendermint's RPC client. +type EventSystem struct { + ctx context.Context + client rpcclient.Client + + // light client mode + lightMode bool + + index filterIndex + + // Subscriptions + txsSub *Subscription // Subscription for new transaction event + logsSub *Subscription // Subscription for new log event + // rmLogsSub *Subscription // Subscription for removed log event + + pendingLogsSub *Subscription // Subscription for pending log event + chainSub *Subscription // Subscription for new chain event + + // Channels + install chan *Subscription // install filter for event notification + uninstall chan *Subscription // remove filter for event notification + + // Unidirectional channels to receive Tendermint ResultEvents + txsCh <-chan coretypes.ResultEvent // Channel to receive new pending transactions event + logsCh <-chan coretypes.ResultEvent // Channel to receive new log event + pendingLogsCh <-chan coretypes.ResultEvent // Channel to receive new pending log event + // rmLogsCh <-chan coretypes.ResultEvent // Channel to receive removed log event + + chainCh <-chan coretypes.ResultEvent // Channel to receive new chain event +} + +// NewEventSystem creates a new manager that listens for event on the given mux, +// parses and filters them. It uses the all map to retrieve filter changes. The +// work loop holds its own index that is used to forward events to filters. +// +// The returned manager has a loop that needs to be stopped with the Stop function +// or by stopping the given mux. +func NewEventSystem(client rpcclient.Client) *EventSystem { + index := make(filterIndex) + for i := filters.UnknownSubscription; i < filters.LastIndexSubscription; i++ { + index[i] = make(map[rpc.ID]*Subscription) + } + + es := &EventSystem{ + ctx: context.Background(), + client: client, + lightMode: false, + index: index, + install: make(chan *Subscription), + uninstall: make(chan *Subscription), + txsCh: make(<-chan coretypes.ResultEvent), + logsCh: make(<-chan coretypes.ResultEvent), + pendingLogsCh: make(<-chan coretypes.ResultEvent), + // rmLogsCh: make(<-chan coretypes.ResultEvent), + chainCh: make(<-chan coretypes.ResultEvent), + } + + go es.eventLoop() + return es +} + +// WithContext sets a new context to the EventSystem. This is required to set a timeout context when +// a new filter is intantiated. +func (es *EventSystem) WithContext(ctx context.Context) { + es.ctx = ctx +} + +// subscribe performs a new event subscription to a given Tendermint event. +// The subscription creates a unidirectional receive event channel to receive the ResultEvent. By +// default, the subscription timeouts (i.e is canceled) after 5 minutes. This function returns an +// error if the subscription fails (eg: if the identifier is already subscribed) or if the filter +// type is invalid. +func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, context.CancelFunc, error) { + var ( + err error + cancelFn context.CancelFunc + eventCh <-chan coretypes.ResultEvent + ) + + es.ctx, cancelFn = context.WithTimeout(context.Background(), deadline) + + switch sub.typ { + case filters.PendingTransactionsSubscription: + eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event) + case filters.PendingLogsSubscription, filters.MinedAndPendingLogsSubscription: + eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event) + case filters.LogsSubscription: + eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event) + case filters.BlocksSubscription: + eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event) + default: + err = fmt.Errorf("invalid filter subscription type %d", sub.typ) + } + + if err != nil { + sub.err <- err + return nil, cancelFn, err + } + + // wrap events in a go routine to prevent blocking + go func() { + es.install <- sub + <-sub.installed + }() + + sub.eventCh = eventCh + return sub, cancelFn, nil +} + +// SubscribeLogs creates a subscription that will write all logs matching the +// given criteria to the given logs channel. Default value for the from and to +// block is "latest". If the fromBlock > toBlock an error is returned. +func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) { + var from, to rpc.BlockNumber + if crit.FromBlock == nil { + from = rpc.LatestBlockNumber + } else { + from = rpc.BlockNumber(crit.FromBlock.Int64()) + } + if crit.ToBlock == nil { + to = rpc.LatestBlockNumber + } else { + to = rpc.BlockNumber(crit.ToBlock.Int64()) + } + + switch { + // only interested in pending logs + case from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber: + return es.subscribePendingLogs(crit) + + // only interested in new mined logs, mined logs within a specific block range, or + // logs from a specific block number to new mined blocks + case (from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber), + (from >= 0 && to >= 0 && to >= from): + return es.subscribeLogs(crit) + + // interested in mined logs from a specific block number, new logs and pending logs + case from >= rpc.LatestBlockNumber && (to == rpc.PendingBlockNumber || to == rpc.LatestBlockNumber): + return es.subscribeMinedPendingLogs(crit) + + default: + return nil, nil, fmt.Errorf("invalid from and to block combination: from > to (%d > %d)", from, to) + } +} + +// subscribeMinedPendingLogs creates a subscription that returned mined and +// pending logs that match the given criteria. +func (es *EventSystem) subscribeMinedPendingLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) { + sub := &Subscription{ + id: rpc.NewID(), + typ: filters.MinedAndPendingLogsSubscription, + event: evmEvents, + logsCrit: crit, + created: time.Now().UTC(), + logs: make(chan []*ethtypes.Log), + installed: make(chan struct{}, 1), + err: make(chan error, 1), + } + return es.subscribe(sub) +} + +// subscribeLogs creates a subscription that will write all logs matching the +// given criteria to the given logs channel. +func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) { + sub := &Subscription{ + id: rpc.NewID(), + typ: filters.LogsSubscription, + event: evmEvents, + logsCrit: crit, + created: time.Now().UTC(), + logs: make(chan []*ethtypes.Log), + installed: make(chan struct{}, 1), + err: make(chan error, 1), + } + return es.subscribe(sub) +} + +// subscribePendingLogs creates a subscription that writes transaction hashes for +// transactions that enter the transaction pool. +func (es *EventSystem) subscribePendingLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) { + sub := &Subscription{ + id: rpc.NewID(), + typ: filters.PendingLogsSubscription, + event: evmEvents, + logsCrit: crit, + created: time.Now().UTC(), + logs: make(chan []*ethtypes.Log), + installed: make(chan struct{}, 1), + err: make(chan error, 1), + } + return es.subscribe(sub) +} + +// SubscribeNewHeads subscribes to new block headers events. +func (es EventSystem) SubscribeNewHeads() (*Subscription, context.CancelFunc, error) { + sub := &Subscription{ + id: rpc.NewID(), + typ: filters.BlocksSubscription, + event: headerEvents, + created: time.Now().UTC(), + headers: make(chan *ethtypes.Header), + installed: make(chan struct{}, 1), + err: make(chan error, 1), + } + return es.subscribe(sub) +} + +// SubscribePendingTxs subscribes to new pending transactions events from the mempool. +func (es EventSystem) SubscribePendingTxs() (*Subscription, context.CancelFunc, error) { + sub := &Subscription{ + id: rpc.NewID(), + typ: filters.PendingTransactionsSubscription, + event: txEvents, + created: time.Now().UTC(), + hashes: make(chan []common.Hash), + installed: make(chan struct{}, 1), + err: make(chan error, 1), + } + return es.subscribe(sub) +} + +type filterIndex map[filters.Type]map[rpc.ID]*Subscription + +func (es *EventSystem) handleLogs(ev coretypes.ResultEvent) { + data, _ := ev.Data.(tmtypes.EventDataTx) + resultData, err := evmtypes.DecodeResultData(data.TxResult.Result.Data) + if err != nil { + return + } + + if len(resultData.Logs) == 0 { + return + } + for _, f := range es.index[filters.LogsSubscription] { + matchedLogs := filterLogs(resultData.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics) + if len(matchedLogs) > 0 { + f.logs <- matchedLogs + } + } +} + +func (es *EventSystem) handleTxsEvent(ev coretypes.ResultEvent) { + data, _ := ev.Data.(tmtypes.EventDataTx) + for _, f := range es.index[filters.PendingTransactionsSubscription] { + f.hashes <- []common.Hash{common.BytesToHash(data.Tx.Hash())} + } +} + +func (es *EventSystem) handleChainEvent(ev coretypes.ResultEvent) { + data, _ := ev.Data.(tmtypes.EventDataNewBlockHeader) + for _, f := range es.index[filters.BlocksSubscription] { + f.headers <- EthHeaderFromTendermint(data.Header) + } + // TODO: light client +} + +// eventLoop (un)installs filters and processes mux events. +func (es *EventSystem) eventLoop() { + var ( + err error + cancelPendingTxsSubs, cancelLogsSubs, cancelPendingLogsSubs, cancelHeaderSubs context.CancelFunc + ) + + // Subscribe events + es.txsSub, cancelPendingTxsSubs, err = es.SubscribePendingTxs() + if err != nil { + panic(fmt.Errorf("failed to subscribe pending txs: %w", err)) + } + + defer cancelPendingTxsSubs() + + es.logsSub, cancelLogsSubs, err = es.SubscribeLogs(filters.FilterCriteria{}) + if err != nil { + panic(fmt.Errorf("failed to subscribe logs: %w", err)) + } + + defer cancelLogsSubs() + + es.pendingLogsSub, cancelPendingLogsSubs, err = es.subscribePendingLogs(filters.FilterCriteria{}) + if err != nil { + panic(fmt.Errorf("failed to subscribe pending logs: %w", err)) + } + + defer cancelPendingLogsSubs() + + es.chainSub, cancelHeaderSubs, err = es.SubscribeNewHeads() + if err != nil { + panic(fmt.Errorf("failed to subscribe headers: %w", err)) + } + + defer cancelHeaderSubs() + + // Ensure all subscriptions get cleaned up + defer func() { + es.txsSub.Unsubscribe(es) + es.logsSub.Unsubscribe(es) + // es.rmLogsSub.Unsubscribe(es) + es.pendingLogsSub.Unsubscribe(es) + es.chainSub.Unsubscribe(es) + }() + + for { + select { + case txEvent := <-es.txsSub.eventCh: + es.handleTxsEvent(txEvent) + case headerEv := <-es.chainSub.eventCh: + es.handleChainEvent(headerEv) + case logsEv := <-es.logsSub.eventCh: + es.handleLogs(logsEv) + // TODO: figure out how to handle removed logs + // case logsEv := <-es.rmLogsSub.eventCh: + // es.handleLogs(logsEv) + case logsEv := <-es.pendingLogsSub.eventCh: + es.handleLogs(logsEv) + + case f := <-es.install: + if f.typ == filters.MinedAndPendingLogsSubscription { + // the type are logs and pending logs subscriptions + es.index[filters.LogsSubscription][f.id] = f + es.index[filters.PendingLogsSubscription][f.id] = f + } else { + es.index[f.typ][f.id] = f + } + close(f.installed) + + case f := <-es.uninstall: + if f.typ == filters.MinedAndPendingLogsSubscription { + // the type are logs and pending logs subscriptions + delete(es.index[filters.LogsSubscription], f.id) + delete(es.index[filters.PendingLogsSubscription], f.id) + } else { + delete(es.index[f.typ], f.id) + } + close(f.err) + // System stopped + case <-es.txsSub.Err(): + return + case <-es.logsSub.Err(): + return + // case <-es.rmLogsSub.Err(): + // return + case <-es.pendingLogsSub.Err(): + return + case <-es.chainSub.Err(): + return + } + } + // }() +} + +// Subscription defines a wrapper for the private subscription +type Subscription struct { + id rpc.ID + typ filters.Type + event string + created time.Time + logsCrit filters.FilterCriteria + logs chan []*ethtypes.Log + hashes chan []common.Hash + headers chan *ethtypes.Header + installed chan struct{} // closed when the filter is installed + eventCh <-chan coretypes.ResultEvent + err chan error +} + +// ID returns the underlying subscription RPC identifier. +func (s Subscription) ID() rpc.ID { + return s.id +} + +// Unsubscribe to the current subscription from Tendermint Websocket. It sends an error to the +// subscription error channel if unsubscription fails. +func (s *Subscription) Unsubscribe(es *EventSystem) { + if err := es.client.Unsubscribe(es.ctx, string(s.ID()), s.event); err != nil { + s.err <- err + } + + go func() { + defer func() { + log.Println("successfully unsubscribed to event", s.event) + }() + + uninstallLoop: + for { + // write uninstall request and consume logs/hashes. This prevents + // the eventLoop broadcast method to deadlock when writing to the + // filter event channel while the subscription loop is waiting for + // this method to return (and thus not reading these events). + select { + case es.uninstall <- s: + break uninstallLoop + case <-s.logs: + case <-s.hashes: + case <-s.headers: + } + } + }() +} + +// Err returns the error channel +func (s *Subscription) Err() <-chan error { + return s.err +} diff --git a/rpc/filters.go b/rpc/filters.go index 4f71edc0..2a94040d 100644 --- a/rpc/filters.go +++ b/rpc/filters.go @@ -1,287 +1,168 @@ package rpc import ( - "errors" + "context" + "fmt" "math/big" - "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/bloombits" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/filters" - "github.com/ethereum/go-ethereum/log" ) -/* - - Filter functions derived from go-ethereum - Used to set the criteria passed in from RPC params -*/ - -const blockFilter = "block" -const pendingTxFilter = "pending" -const logFilter = "log" - -// Filter can be used to retrieve and filter logs, blocks, or pending transactions. +// Filter can be used to retrieve and filter logs. type Filter struct { - backend Backend - fromBlock, toBlock *big.Int // start and end block numbers - addresses []common.Address // contract addresses to watch - topics [][]common.Hash // log topics to watch for - blockHash *common.Hash // Block hash if filtering a single block - - typ string - hashes []common.Hash // filtered block or transaction hashes - logs []*ethtypes.Log //nolint // filtered logs - stopped bool // set to true once filter in uninstalled - - err error + backend FiltersBackend + criteria filters.FilterCriteria + matcher *bloombits.Matcher } -// NewFilter returns a new Filter -func NewFilter(backend Backend, criteria *filters.FilterCriteria) *Filter { - filter := &Filter{ - backend: backend, - fromBlock: criteria.FromBlock, - toBlock: criteria.ToBlock, - addresses: criteria.Addresses, - topics: criteria.Topics, - typ: logFilter, - stopped: false, +// NewBlockFilter creates a new filter which directly inspects the contents of +// a block to figure out whether it is interesting or not. +func NewBlockFilter(backend FiltersBackend, criteria filters.FilterCriteria) *Filter { + // Create a generic filter and convert it into a block filter + return newFilter(backend, criteria, nil) +} + +// NewRangeFilter creates a new filter which uses a bloom filter on blocks to +// figure out whether a particular block is interesting or not. +func NewRangeFilter(backend FiltersBackend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { + // Flatten the address and topic filter clauses into a single bloombits filter + // system. Since the bloombits are not positional, nil topics are permitted, + // which get flattened into a nil byte slice. + var filtersBz [][][]byte // nolint: prealloc + if len(addresses) > 0 { + filter := make([][]byte, len(addresses)) + for i, address := range addresses { + filter[i] = address.Bytes() + } + filtersBz = append(filtersBz, filter) } - return filter + for _, topicList := range topics { + filter := make([][]byte, len(topicList)) + for i, topic := range topicList { + filter[i] = topic.Bytes() + } + filtersBz = append(filtersBz, filter) + } + + size, _ := backend.BloomStatus() + + // Create a generic filter and convert it into a range filter + criteria := filters.FilterCriteria{ + FromBlock: big.NewInt(begin), + ToBlock: big.NewInt(end), + Addresses: addresses, + Topics: topics, + } + + return newFilter(backend, criteria, bloombits.NewMatcher(size, filtersBz)) } -// NewFilterWithBlockHash returns a new Filter with a blockHash. -func NewFilterWithBlockHash(backend Backend, criteria *filters.FilterCriteria) *Filter { +// newFilter returns a new Filter +func newFilter(backend FiltersBackend, criteria filters.FilterCriteria, matcher *bloombits.Matcher) *Filter { return &Filter{ - backend: backend, - fromBlock: criteria.FromBlock, - toBlock: criteria.ToBlock, - addresses: criteria.Addresses, - topics: criteria.Topics, - blockHash: criteria.BlockHash, - typ: logFilter, + backend: backend, + criteria: criteria, + matcher: matcher, } } -// NewBlockFilter creates a new filter that notifies when a block arrives. -func NewBlockFilter(backend Backend) *Filter { - filter := NewFilter(backend, &filters.FilterCriteria{}) - filter.typ = blockFilter +// Logs searches the blockchain for matching log entries, returning all from the +// first block that contains matches, updating the start of the filter accordingly. +func (f *Filter) Logs(_ context.Context) ([]*ethtypes.Log, error) { + logs := []*ethtypes.Log{} + var err error - go func() { - err := filter.pollForBlocks() - if err != nil { - filter.err = err - } - }() - - return filter -} - -func (f *Filter) pollForBlocks() error { - prev := hexutil.Uint64(0) - - for { - if f.stopped { - return nil - } - - num, err := f.backend.BlockNumber() - if err != nil { - return err - } - - if num == prev { - continue - } - - block, err := f.backend.GetBlockByNumber(BlockNumber(num), false) - if err != nil { - return err - } - - hashBytes, ok := block["hash"].(hexutil.Bytes) - if !ok { - return errors.New("could not convert block hash to hexutil.Bytes") - } - - hash := common.BytesToHash(hashBytes) - f.hashes = append(f.hashes, hash) - - prev = num - - // TODO: should we add a delay? - } -} - -func (f *Filter) pollForTransactions() error { - for { - if f.stopped { - return nil - } - - txs, err := f.backend.PendingTransactions() - if err != nil { - return err - } - - for _, tx := range txs { - if !contains(f.hashes, tx.Hash) { - f.hashes = append(f.hashes, tx.Hash) - } - } - - <-time.After(1 * time.Second) - - } -} - -func contains(slice []common.Hash, item common.Hash) bool { - set := make(map[common.Hash]struct{}, len(slice)) - for _, s := range slice { - set[s] = struct{}{} - } - - _, ok := set[item] - return ok -} - -// NewPendingTransactionFilter creates a new filter that notifies when a pending transaction arrives. -func NewPendingTransactionFilter(backend Backend) *Filter { - filter := NewFilter(backend, &filters.FilterCriteria{}) - filter.typ = pendingTxFilter - - go func() { - err := filter.pollForTransactions() - if err != nil { - filter.err = err - } - }() - - return filter -} - -func (f *Filter) uninstallFilter() { - f.stopped = true -} - -func (f *Filter) getFilterChanges() (interface{}, error) { - switch f.typ { - case blockFilter: - if f.err != nil { - return nil, f.err - } - - blocks := make([]common.Hash, len(f.hashes)) - copy(blocks, f.hashes) - f.hashes = []common.Hash{} - - return blocks, nil - case pendingTxFilter: - if f.err != nil { - return nil, f.err - } - - txs := make([]common.Hash, len(f.hashes)) - copy(txs, f.hashes) - f.hashes = []common.Hash{} - return txs, nil - case logFilter: - return f.getFilterLogs() - } - - return nil, errors.New("unsupported filter") -} - -func (f *Filter) getFilterLogs() ([]*ethtypes.Log, error) { - ret := []*ethtypes.Log{} - - // filter specific block only - if f.blockHash != nil { - block, err := f.backend.GetBlockByHash(*f.blockHash, true) + // If we're doing singleton block filtering, execute and return + if f.criteria.BlockHash != nil && f.criteria.BlockHash != (&common.Hash{}) { + header, err := f.backend.HeaderByHash(*f.criteria.BlockHash) if err != nil { return nil, err } - - // if the logsBloom == 0, there are no logs in that block - if txs, ok := block["transactions"].([]common.Hash); !ok { - return ret, nil - } else if len(txs) != 0 { - return f.checkMatches(block) + if header == nil { + return nil, fmt.Errorf("unknown block header %s", f.criteria.BlockHash.String()) } + return f.blockLogs(header) } - // filter range of blocks - num, err := f.backend.BlockNumber() + // Figure out the limits of the filter range + header, err := f.backend.HeaderByNumber(LatestBlockNumber) if err != nil { return nil, err } - // if f.fromBlock is set to 0, set it to the latest block number - if f.fromBlock == nil || f.fromBlock.Cmp(big.NewInt(0)) == 0 { - f.fromBlock = big.NewInt(int64(num)) + if header == nil || header.Number == nil { + return nil, nil } - // if f.toBlock is set to 0, set it to the latest block number - if f.toBlock == nil || f.toBlock.Cmp(big.NewInt(0)) == 0 { - f.toBlock = big.NewInt(int64(num)) + head := header.Number.Int64() + if f.criteria.FromBlock.Int64() == -1 { + f.criteria.FromBlock = big.NewInt(head) + } + if f.criteria.ToBlock.Int64() == -1 { + f.criteria.ToBlock = big.NewInt(head) } - log.Debug("[ethAPI] Retrieving filter logs", "fromBlock", f.fromBlock, "toBlock", f.toBlock, - "topics", f.topics, "addresses", f.addresses) - - from := f.fromBlock.Int64() - to := f.toBlock.Int64() - - for i := from; i <= to; i++ { - block, err := f.backend.GetBlockByNumber(NewBlockNumber(big.NewInt(i)), true) + for i := f.criteria.FromBlock.Int64(); i <= f.criteria.ToBlock.Int64(); i++ { + block, err := f.backend.GetBlockByNumber(BlockNumber(i), true) if err != nil { - f.err = err - log.Debug("[ethAPI] Cannot get block", "block", block["number"], "error", err) - break + return logs, err } - log.Debug("[ethAPI] filtering", "block", block) - - // TODO: block logsBloom is often set in the wrong block - // if the logsBloom == 0, there are no logs in that block - - if txs, ok := block["transactions"].([]common.Hash); !ok { + txs, ok := block["transactions"].([]common.Hash) + if !ok || len(txs) == 0 { continue - } else if len(txs) != 0 { - logs, err := f.checkMatches(block) - if err != nil { - f.err = err - break - } - - ret = append(ret, logs...) } + + logsMatched := f.checkMatches(txs) + logs = append(logs, logsMatched...) } - return ret, nil + return logs, nil } -func (f *Filter) checkMatches(block map[string]interface{}) ([]*ethtypes.Log, error) { - transactions, ok := block["transactions"].([]common.Hash) - if !ok { - return nil, errors.New("invalid block transactions") +// blockLogs returns the logs matching the filter criteria within a single block. +func (f *Filter) blockLogs(header *ethtypes.Header) ([]*ethtypes.Log, error) { + if !bloomFilter(header.Bloom, f.criteria.Addresses, f.criteria.Topics) { + return []*ethtypes.Log{}, nil } - unfiltered := []*ethtypes.Log{} + logsList, err := f.backend.GetLogs(header.Hash()) + if err != nil { + return []*ethtypes.Log{}, err + } + var unfiltered []*ethtypes.Log // nolint: prealloc + for _, logs := range logsList { + unfiltered = append(unfiltered, logs...) + } + logs := filterLogs(unfiltered, nil, nil, f.criteria.Addresses, f.criteria.Topics) + if len(logs) == 0 { + return []*ethtypes.Log{}, nil + } + return logs, nil +} + +// checkMatches checks if the logs from the a list of transactions transaction +// contain any log events that match the filter criteria. This function is +// called when the bloom filter signals a potential match. +func (f *Filter) checkMatches(transactions []common.Hash) []*ethtypes.Log { + unfiltered := []*ethtypes.Log{} for _, tx := range transactions { - logs, err := f.backend.GetTransactionLogs(common.BytesToHash(tx[:])) + logs, err := f.backend.GetTransactionLogs(tx) if err != nil { - return nil, err + // ignore error if transaction didn't set any logs (eg: when tx type is not + // MsgEthereumTx or MsgEthermint) + continue } unfiltered = append(unfiltered, logs...) } - return filterLogs(unfiltered, f.fromBlock, f.toBlock, f.addresses, f.topics), nil + return filterLogs(unfiltered, f.criteria.FromBlock, f.criteria.ToBlock, f.criteria.Addresses, f.criteria.Topics) } // filterLogs creates a slice of logs matching the given criteria. @@ -305,7 +186,7 @@ Logs: } // If the to filtered topics is greater than the amount of topics in logs, skip. if len(topics) > len(log.Topics) { - continue Logs + continue } for i, sub := range topics { match := len(sub) == 0 // empty rule set == wildcard @@ -333,3 +214,29 @@ func includes(addresses []common.Address, a common.Address) bool { return false } + +func bloomFilter(bloom ethtypes.Bloom, addresses []common.Address, topics [][]common.Hash) bool { + var included bool + if len(addresses) > 0 { + for _, addr := range addresses { + if ethtypes.BloomLookup(bloom, addr) { + included = true + break + } + } + if !included { + return false + } + } + + for _, sub := range topics { + included = len(sub) == 0 // empty rule set == wildcard + for _, topic := range sub { + if ethtypes.BloomLookup(bloom, topic) { + included = true + break + } + } + } + return included +} diff --git a/rpc/net_api.go b/rpc/net_api.go index 56582854..013117be 100644 --- a/rpc/net_api.go +++ b/rpc/net_api.go @@ -15,8 +15,8 @@ type PublicNetAPI struct { networkVersion uint64 } -// NewPersonalEthAPI creates an instance of the public ETH Web3 API. -func NewPublicNetAPI(cliCtx context.CLIContext) *PublicNetAPI { +// NewPublicNetAPI creates an instance of the public Net Web3 API. +func NewPublicNetAPI(_ context.CLIContext) *PublicNetAPI { chainID := viper.GetString(flags.FlagChainID) // parse the chainID from a integer string intChainID, err := strconv.ParseUint(chainID, 0, 64) diff --git a/rpc/types.go b/rpc/types.go index b21314c0..935947a2 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -20,6 +20,7 @@ const ( EarliestBlockNumber = BlockNumber(1) ) +// NewBlockNumber creates a new BlockNumber instance. func NewBlockNumber(n *big.Int) BlockNumber { return BlockNumber(n.Int64()) } diff --git a/tests/rpc_test.go b/tests/rpc_test.go index 102c35f6..fde00618 100644 --- a/tests/rpc_test.go +++ b/tests/rpc_test.go @@ -276,6 +276,22 @@ func TestEth_NewBlockFilter(t *testing.T) { require.NoError(t, err) } +func TestEth_GetFilterChanges_BlockFilter(t *testing.T) { + rpcRes := call(t, "eth_newBlockFilter", []string{}) + + var ID hexutil.Bytes + err := json.Unmarshal(rpcRes.Result, &ID) + require.NoError(t, err) + + time.Sleep(5 * time.Second) + + changesRes := call(t, "eth_getFilterChanges", []string{ID.String()}) + var hashes []ethcmn.Hash + err = json.Unmarshal(changesRes.Result, &hashes) + require.NoError(t, err) + require.GreaterOrEqual(t, len(hashes), 1) +} + func TestEth_GetFilterChanges_NoLogs(t *testing.T) { param := make([]map[string][]string, 1) param[0] = make(map[string][]string) @@ -286,6 +302,8 @@ func TestEth_GetFilterChanges_NoLogs(t *testing.T) { err := json.Unmarshal(rpcRes.Result, &ID) require.NoError(t, err) + t.Log(ID.String()) + changesRes := call(t, "eth_getFilterChanges", []string{ID.String()}) var logs []*ethtypes.Log @@ -419,7 +437,6 @@ func TestEth_GetTransactionLogs(t *testing.T) { logs := new([]*ethtypes.Log) err := json.Unmarshal(rpcRes.Result, logs) require.NoError(t, err) - require.Equal(t, 1, len(*logs)) } @@ -434,7 +451,6 @@ func TestEth_GetFilterChanges_NoTopics(t *testing.T) { param[0] = make(map[string]interface{}) param[0]["topics"] = []string{} param[0]["fromBlock"] = res.String() - param[0]["toBlock"] = zeroString // latest // instantiate new filter rpcRes = call(t, "eth_newFilter", param) @@ -526,7 +542,6 @@ func TestEth_GetFilterChanges_Topics_AB(t *testing.T) { param[0] = make(map[string]interface{}) param[0]["topics"] = []string{helloTopic, worldTopic} param[0]["fromBlock"] = res.String() - param[0]["toBlock"] = zeroString // latest // instantiate new filter rpcRes = call(t, "eth_newFilter", param) @@ -557,7 +572,6 @@ func TestEth_GetFilterChanges_Topics_XB(t *testing.T) { param[0] = make(map[string]interface{}) param[0]["topics"] = []interface{}{nil, worldTopic} param[0]["fromBlock"] = res.String() - param[0]["toBlock"] = "0x0" // latest // instantiate new filter rpcRes = call(t, "eth_newFilter", param) @@ -600,7 +614,6 @@ func TestEth_GetLogs_Topics_AB(t *testing.T) { param[0] = make(map[string]interface{}) param[0]["topics"] = []string{helloTopic, worldTopic} param[0]["fromBlock"] = res.String() - param[0]["toBlock"] = zeroString // latest hash := deployTestContractWithFunction(t) waitForReceipt(t, hash) @@ -637,7 +650,6 @@ func TestEth_PendingTransactionFilter(t *testing.T) { require.NoError(t, err, string(changesRes.Result)) require.True(t, len(txs) >= 2, "could not get any txs", "changesRes.Result", string(changesRes.Result)) - } func TestBlockBloom(t *testing.T) {