diff --git a/Cargo.lock b/Cargo.lock index a0bb00593..9d9bb075d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -31,7 +31,7 @@ dependencies = [ "slog-term", "slot_clock", "tempfile", - "tokio 0.3.5", + "tokio 0.3.6", "tokio-compat-02", "types", "validator_dir", @@ -130,7 +130,7 @@ dependencies = [ "aes 0.5.0", "block-cipher", "ghash", - "subtle 2.3.0", + "subtle 2.4.0", ] [[package]] @@ -144,7 +144,7 @@ dependencies = [ "cipher", "ctr", "ghash", - "subtle 2.3.0", + "subtle 2.4.0", ] [[package]] @@ -430,9 +430,9 @@ checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" [[package]] name = "async-tls" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d85a97c4a0ecce878efd3f945f119c78a646d8975340bca0398f9bb05c30cc52" +checksum = "2f23d769dbf1838d5df5156e7b1ad404f4c463d1ac2c6aeb6cd943630f8a8400" dependencies = [ "futures-core", "futures-io", @@ -473,7 +473,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf13118df3e3dce4b5ac930641343b91b656e4e72c8f8325838b01a4b1c9d45" dependencies = [ - "http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.2.2", "log", "url", ] @@ -599,7 +599,7 @@ dependencies = [ "store", "task_executor", "tempfile", - "tokio 0.3.5", + "tokio 0.3.6", "tree_hash", "types", ] @@ -636,7 +636,7 @@ dependencies = [ "slog-term", "store", "task_executor", - "tokio 0.3.5", + "tokio 0.3.6", "tokio-compat-02", "types", ] @@ -848,7 +848,7 @@ dependencies = [ "slog-stdlog", "slog-term", "sloggers", - "tokio 0.3.5", + "tokio 0.3.6", "types", ] @@ -1099,7 +1099,7 @@ dependencies = [ "task_executor", "time 0.2.23", "timer", - "tokio 0.3.5", + "tokio 0.3.6", "toml", "tree_hash", "types", @@ -1350,7 +1350,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b584a330336237c1eecd3e94266efb216c56ed91225d634cb2991c5f3fd1aeab" dependencies = [ "generic-array 0.14.4", - "subtle 2.3.0", + "subtle 2.4.0", ] [[package]] @@ -1360,7 +1360,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58bcd97a54c7ca5ce2f6eb16f6bede5b0ab5f0055fedc17d2f0b4466e21671ca" dependencies = [ "generic-array 0.14.4", - "subtle 2.3.0", + "subtle 2.4.0", ] [[package]] @@ -1370,7 +1370,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4857fd85a0c34b3c3297875b747c1e02e06b6a0ea32dd892d8192b9ce0813ea6" dependencies = [ "generic-array 0.14.4", - "subtle 2.3.0", + "subtle 2.4.0", ] [[package]] @@ -1454,7 +1454,7 @@ dependencies = [ "byteorder", "digest 0.9.0", "rand_core 0.5.1", - "subtle 2.3.0", + "subtle 2.4.0", "zeroize", ] @@ -1649,7 +1649,7 @@ dependencies = [ "rlp", "sha2 0.9.2", "smallvec", - "tokio 0.3.5", + "tokio 0.3.6", "tokio-util 0.5.1", "tracing", "tracing-subscriber", @@ -1751,7 +1751,7 @@ dependencies = [ "generic-array 0.14.4", "group", "rand_core 0.5.1", - "subtle 2.3.0", + "subtle 2.4.0", "zeroize", ] @@ -1832,7 +1832,7 @@ dependencies = [ "slog-term", "sloggers", "task_executor", - "tokio 0.3.5", + "tokio 0.3.6", "types", ] @@ -1871,7 +1871,7 @@ dependencies = [ "sloggers", "state_processing", "task_executor", - "tokio 0.3.5", + "tokio 0.3.6", "tokio-compat-02", "toml", "tree_hash", @@ -1886,7 +1886,7 @@ dependencies = [ "deposit_contract", "futures 0.3.8", "serde_json", - "tokio 0.3.5", + "tokio 0.3.6", "tokio-compat-02", "types", "web3", @@ -2025,7 +2025,7 @@ dependencies = [ "task_executor", "tempdir", "tiny-keccak 2.0.2", - "tokio 0.3.5", + "tokio 0.3.6", "tokio-io-timeout", "tokio-util 0.4.0", "types", @@ -2215,7 +2215,7 @@ checksum = "01646e077d4ebda82b73f1bca002ea1e91561a77df2431a9e79729bcc31950ef" dependencies = [ "bitvec 0.18.4", "rand_core 0.5.1", - "subtle 2.3.0", + "subtle 2.4.0", ] [[package]] @@ -2532,7 +2532,7 @@ dependencies = [ "serde_derive", "slog", "state_processing", - "tokio 0.3.5", + "tokio 0.3.6", "tokio-compat-02", "tree_hash", "types", @@ -2613,7 +2613,7 @@ checksum = "cc11f9f5fbf1943b48ae7c2bf6846e7d827a512d1be4f23af708f5ca5d01dde1" dependencies = [ "ff", "rand_core 0.5.1", - "subtle 2.3.0", + "subtle 2.4.0", ] [[package]] @@ -2627,7 +2627,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.2.2", "indexmap", "slab", "tokio 0.2.24", @@ -2646,10 +2646,10 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http 0.2.1 (git+https://github.com/agemanning/http?branch=lighthouse)", + "http 0.2.1", "indexmap", "slab", - "tokio 0.3.5", + "tokio 0.3.6", "tokio-util 0.5.1", "tracing", "tracing-futures", @@ -2684,7 +2684,7 @@ name = "hashset_delay" version = "0.2.0" dependencies = [ "futures 0.3.8", - "tokio 0.3.5", + "tokio 0.3.6", "tokio-util 0.5.1", ] @@ -2698,7 +2698,7 @@ dependencies = [ "bitflags", "bytes 0.5.6", "headers-core 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.2.2", "mime", "sha-1 0.8.2", "time 0.1.44", @@ -2713,7 +2713,7 @@ dependencies = [ "bitflags", "bytes 0.6.0", "headers-core 0.2.0 (git+https://github.com/blacktemplar/headers?branch=lighthouse)", - "http 0.2.1 (git+https://github.com/agemanning/http?branch=lighthouse)", + "http 0.2.1", "mime", "sha-1 0.8.2", "time 0.1.44", @@ -2725,7 +2725,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" dependencies = [ - "http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.2.2", ] [[package]] @@ -2733,7 +2733,7 @@ name = "headers-core" version = "0.2.0" source = "git+https://github.com/blacktemplar/headers?branch=lighthouse#8bffbd8aa2e170745a81e62fc0d7e98c0a23a69a" dependencies = [ - "http 0.2.1 (git+https://github.com/agemanning/http?branch=lighthouse)", + "http 0.2.1", ] [[package]] @@ -2830,20 +2830,20 @@ dependencies = [ [[package]] name = "http" version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9" +source = "git+https://github.com/agemanning/http?branch=lighthouse#144a8ad6334f40bf4f84e26cf582ff164795024a" dependencies = [ - "bytes 0.5.6", + "bytes 0.6.0", "fnv", "itoa", ] [[package]] name = "http" -version = "0.2.1" -source = "git+https://github.com/agemanning/http?branch=lighthouse#144a8ad6334f40bf4f84e26cf582ff164795024a" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84129d298a6d57d246960ff8eb831ca4af3f96d29e2e28848dae275408658e26" dependencies = [ - "bytes 0.6.0", + "bytes 0.5.6", "fnv", "itoa", ] @@ -2855,7 +2855,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b" dependencies = [ "bytes 0.5.6", - "http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.2.2", ] [[package]] @@ -2864,7 +2864,7 @@ version = "0.3.1" source = "git+https://github.com/agemanning/http-body?branch=lighthouse#a10365c24eaee8eab881519accad48b695b88ccf" dependencies = [ "bytes 0.6.0", - "http 0.2.1 (git+https://github.com/agemanning/http?branch=lighthouse)", + "http 0.2.1", ] [[package]] @@ -2892,7 +2892,7 @@ dependencies = [ "slot_clock", "state_processing", "store", - "tokio 0.3.5", + "tokio 0.3.6", "tokio-compat-02", "tree_hash", "types", @@ -2917,7 +2917,7 @@ dependencies = [ "slog", "slot_clock", "store", - "tokio 0.3.5", + "tokio 0.3.6", "tokio-compat-02", "types", "warp", @@ -2980,7 +2980,7 @@ dependencies = [ "futures-core", "futures-util", "h2 0.2.7", - "http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.2.2", "http-body 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "httparse", "httpdate", @@ -3003,14 +3003,14 @@ dependencies = [ "futures-core", "futures-util", "h2 0.3.0", - "http 0.2.1 (git+https://github.com/agemanning/http?branch=lighthouse)", + "http 0.2.1", "http-body 0.3.1 (git+https://github.com/agemanning/http-body?branch=lighthouse)", "httparse", "httpdate", "itoa", "pin-project 1.0.2", "socket2", - "tokio 0.3.5", + "tokio 0.3.6", "tower-service", "tracing", "want", @@ -3024,7 +3024,7 @@ checksum = "f93ec5be69758dfc06b9b29efa9d6e9306e387c85eb362c603912eead2ad98c7" dependencies = [ "bytes 0.5.6", "futures 0.3.8", - "http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.2.2", "hyper 0.13.9", "hyper-tls", "native-tls", @@ -3121,9 +3121,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.6.0" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55e2e4c765aa53a0424761bf9f41aa7a6ac1efa87238f59560640e27fca028f2" +checksum = "4fb1fa934250de4de8aef298d81c729a7d33d8c239daa3a7575e6b92bfc7313b" dependencies = [ "autocfg 1.0.1", "hashbrown", @@ -3193,7 +3193,7 @@ dependencies = [ "encoding_rs", "flume", "futures-lite", - "http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.2.2", "log", "mime", "once_cell", @@ -3354,7 +3354,7 @@ dependencies = [ "serde_yaml", "simple_logger", "state_processing", - "tokio 0.3.5", + "tokio 0.3.6", "tokio-compat-02", "tree_hash", "types", @@ -3431,14 +3431,14 @@ dependencies = [ [[package]] name = "libp2p" -version = "0.30.1" -source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" +version = "0.33.0" +source = "git+https://github.com/sigp/rust-libp2p?rev=97000533e4710183124abde017c6c3d68287c1ae#97000533e4710183124abde017c6c3d68287c1ae" dependencies = [ "atomic", "bytes 0.5.6", "futures 0.3.8", "lazy_static", - "libp2p-core 0.25.0", + "libp2p-core 0.26.0", "libp2p-core-derive", "libp2p-dns", "libp2p-gossipsub", @@ -3449,7 +3449,7 @@ dependencies = [ "libp2p-tcp", "libp2p-websocket", "libp2p-yamux", - "parity-multiaddr 0.9.6 (git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25)", + "parity-multiaddr 0.10.0", "parking_lot", "pin-project 1.0.2", "smallvec", @@ -3473,8 +3473,8 @@ dependencies = [ "libsecp256k1", "log", "multihash 0.11.4", - "multistream-select 0.8.5 (registry+https://github.com/rust-lang/crates.io-index)", - "parity-multiaddr 0.9.6 (registry+https://github.com/rust-lang/crates.io-index)", + "multistream-select 0.8.5", + "parity-multiaddr 0.9.6", "parking_lot", "pin-project 1.0.2", "prost", @@ -3492,12 +3492,11 @@ dependencies = [ [[package]] name = "libp2p-core" -version = "0.25.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" +version = "0.26.0" +source = "git+https://github.com/sigp/rust-libp2p?rev=97000533e4710183124abde017c6c3d68287c1ae#97000533e4710183124abde017c6c3d68287c1ae" dependencies = [ "asn1_der", "bs58 0.4.0", - "bytes 0.5.6", "ed25519-dalek", "either", "fnv", @@ -3507,8 +3506,8 @@ dependencies = [ "libsecp256k1", "log", "multihash 0.13.2", - "multistream-select 0.8.5 (git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25)", - "parity-multiaddr 0.9.6 (git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25)", + "multistream-select 0.9.1", + "parity-multiaddr 0.10.0", "parking_lot", "pin-project 1.0.2", "prost", @@ -3526,8 +3525,8 @@ dependencies = [ [[package]] name = "libp2p-core-derive" -version = "0.20.2" -source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" +version = "0.21.0" +source = "git+https://github.com/sigp/rust-libp2p?rev=97000533e4710183124abde017c6c3d68287c1ae#97000533e4710183124abde017c6c3d68287c1ae" dependencies = [ "quote", "syn", @@ -3535,18 +3534,18 @@ dependencies = [ [[package]] name = "libp2p-dns" -version = "0.25.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" +version = "0.26.0" +source = "git+https://github.com/sigp/rust-libp2p?rev=97000533e4710183124abde017c6c3d68287c1ae#97000533e4710183124abde017c6c3d68287c1ae" dependencies = [ "futures 0.3.8", - "libp2p-core 0.25.0", + "libp2p-core 0.26.0", "log", ] [[package]] name = "libp2p-gossipsub" -version = "0.25.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" +version = "0.26.0" +source = "git+https://github.com/sigp/rust-libp2p?rev=97000533e4710183124abde017c6c3d68287c1ae#97000533e4710183124abde017c6c3d68287c1ae" dependencies = [ "base64 0.13.0", "byteorder", @@ -3555,12 +3554,13 @@ dependencies = [ "futures 0.3.8", "futures_codec", "hex_fmt", - "libp2p-core 0.25.0", + "libp2p-core 0.26.0", "libp2p-swarm", "log", "prost", "prost-build", "rand 0.7.3", + "regex", "sha2 0.9.2", "smallvec", "unsigned-varint 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3569,11 +3569,11 @@ dependencies = [ [[package]] name = "libp2p-identify" -version = "0.25.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" +version = "0.26.0" +source = "git+https://github.com/sigp/rust-libp2p?rev=97000533e4710183124abde017c6c3d68287c1ae#97000533e4710183124abde017c6c3d68287c1ae" dependencies = [ "futures 0.3.8", - "libp2p-core 0.25.0", + "libp2p-core 0.26.0", "libp2p-swarm", "log", "prost", @@ -3584,13 +3584,13 @@ dependencies = [ [[package]] name = "libp2p-mplex" -version = "0.25.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" +version = "0.26.0" +source = "git+https://github.com/sigp/rust-libp2p?rev=97000533e4710183124abde017c6c3d68287c1ae#97000533e4710183124abde017c6c3d68287c1ae" dependencies = [ "bytes 0.5.6", "futures 0.3.8", "futures_codec", - "libp2p-core 0.25.0", + "libp2p-core 0.26.0", "log", "nohash-hasher", "parking_lot", @@ -3601,14 +3601,14 @@ dependencies = [ [[package]] name = "libp2p-noise" -version = "0.27.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" +version = "0.28.0" +source = "git+https://github.com/sigp/rust-libp2p?rev=97000533e4710183124abde017c6c3d68287c1ae#97000533e4710183124abde017c6c3d68287c1ae" dependencies = [ "bytes 0.5.6", "curve25519-dalek", "futures 0.3.8", "lazy_static", - "libp2p-core 0.25.0", + "libp2p-core 0.26.0", "log", "prost", "prost-build", @@ -3622,12 +3622,12 @@ dependencies = [ [[package]] name = "libp2p-swarm" -version = "0.25.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" +version = "0.26.0" +source = "git+https://github.com/sigp/rust-libp2p?rev=97000533e4710183124abde017c6c3d68287c1ae#97000533e4710183124abde017c6c3d68287c1ae" dependencies = [ "either", "futures 0.3.8", - "libp2p-core 0.25.0", + "libp2p-core 0.26.0", "log", "rand 0.7.3", "smallvec", @@ -3637,28 +3637,28 @@ dependencies = [ [[package]] name = "libp2p-tcp" -version = "0.25.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" +version = "0.26.0" +source = "git+https://github.com/sigp/rust-libp2p?rev=97000533e4710183124abde017c6c3d68287c1ae#97000533e4710183124abde017c6c3d68287c1ae" dependencies = [ "futures 0.3.8", "futures-timer", "if-addrs", "ipnet", - "libp2p-core 0.25.0", + "libp2p-core 0.26.0", "log", "socket2", - "tokio 0.3.5", + "tokio 0.3.6", ] [[package]] name = "libp2p-websocket" -version = "0.26.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" +version = "0.27.0" +source = "git+https://github.com/sigp/rust-libp2p?rev=97000533e4710183124abde017c6c3d68287c1ae#97000533e4710183124abde017c6c3d68287c1ae" dependencies = [ "async-tls", "either", "futures 0.3.8", - "libp2p-core 0.25.0", + "libp2p-core 0.26.0", "log", "quicksink", "rustls", @@ -3671,11 +3671,11 @@ dependencies = [ [[package]] name = "libp2p-yamux" -version = "0.28.0" -source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" +version = "0.29.0" +source = "git+https://github.com/sigp/rust-libp2p?rev=97000533e4710183124abde017c6c3d68287c1ae#97000533e4710183124abde017c6c3d68287c1ae" dependencies = [ "futures 0.3.8", - "libp2p-core 0.25.0", + "libp2p-core 0.26.0", "parking_lot", "thiserror", "yamux", @@ -3693,7 +3693,7 @@ dependencies = [ "hmac-drbg", "rand 0.7.3", "sha2 0.8.2", - "subtle 2.3.0", + "subtle 2.4.0", "typenum", ] @@ -3745,7 +3745,7 @@ dependencies = [ "slog-term", "sloggers", "tempfile", - "tokio 0.3.5", + "tokio 0.3.6", "tokio-compat-02", "types", "validator_client", @@ -3847,9 +3847,9 @@ dependencies = [ [[package]] name = "lru" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be716eb6878ca2263eb5d00a781aa13264a794f519fe6af4fbb2668b2d5441c0" +checksum = "3abe07af102235a56ac9a6dd904aab1e05483e2e8afdfffec3598be55b1b7606" dependencies = [ "hashbrown", ] @@ -4136,8 +4136,8 @@ dependencies = [ [[package]] name = "multistream-select" -version = "0.8.5" -source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" +version = "0.9.1" +source = "git+https://github.com/sigp/rust-libp2p?rev=97000533e4710183124abde017c6c3d68287c1ae#97000533e4710183124abde017c6c3d68287c1ae" dependencies = [ "bytes 0.5.6", "futures 0.3.8", @@ -4223,7 +4223,7 @@ dependencies = [ "store", "task_executor", "tempfile", - "tokio 0.3.5", + "tokio 0.3.6", "tree_hash", "types", ] @@ -4481,8 +4481,8 @@ dependencies = [ [[package]] name = "parity-multiaddr" -version = "0.9.6" -source = "git+https://github.com/sigp/rust-libp2p?rev=830e6fabb7ee51281a98f5e092f056668adbef25#830e6fabb7ee51281a98f5e092f056668adbef25" +version = "0.10.0" +source = "git+https://github.com/sigp/rust-libp2p?rev=97000533e4710183124abde017c6c3d68287c1ae#97000533e4710183124abde017c6c3d68287c1ae" dependencies = [ "arrayref", "bs58 0.4.0", @@ -5303,7 +5303,7 @@ dependencies = [ "remote_signer_test", "reqwest", "serde", - "tokio 0.3.5", + "tokio 0.3.6", "types", ] @@ -5321,7 +5321,7 @@ dependencies = [ "serde", "serde_json", "tempdir", - "tokio 0.3.5", + "tokio 0.3.6", "tokio-compat-02", "types", ] @@ -5337,16 +5337,16 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.10.9" +version = "0.10.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb15d6255c792356a0f578d8a645c677904dc02e862bebe2ecc18e0c01b9a0ce" +checksum = "0718f81a8e14c4dbb3b34cf23dc6aaf9ab8a0dfec160c534b3dbca1aaa21f47c" dependencies = [ "base64 0.13.0", "bytes 0.5.6", "encoding_rs", "futures-core", "futures-util", - "http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.2.2", "http-body 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.13.9", "hyper-tls", @@ -5367,7 +5367,6 @@ dependencies = [ "url", "wasm-bindgen", "wasm-bindgen-futures", - "wasm-bindgen-test", "web-sys", "winreg", ] @@ -5468,11 +5467,11 @@ dependencies = [ [[package]] name = "rustls" -version = "0.18.1" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d1126dcf58e93cee7d098dbda643b5f92ed724f1f6a63007c1116eed6700c81" +checksum = "064fd21ff87c6e87ed4506e68beb42459caa4a0e2eb144932e6776768556980b" dependencies = [ - "base64 0.12.3", + "base64 0.13.0", "log", "ring", "sct", @@ -5861,7 +5860,7 @@ dependencies = [ "node_test_rig", "parking_lot", "rayon", - "tokio 0.3.5", + "tokio 0.3.6", "tokio-compat-02", "types", "validator_client", @@ -5920,7 +5919,7 @@ dependencies = [ "slot_clock", "state_processing", "task_executor", - "tokio 0.3.5", + "tokio 0.3.6", "types", ] @@ -6083,19 +6082,18 @@ dependencies = [ "ring", "rustc_version", "sha2 0.9.2", - "subtle 2.3.0", + "subtle 2.4.0", "x25519-dalek", ] [[package]] name = "socket2" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c29947abdee2a218277abeca306f25789c938e500ea5a9d4b12a5a504466902" +checksum = "97e0e9fd577458a4f61fb91fcb559ea2afecc54c934119421f9f5d3d5b1a1057" dependencies = [ "cfg-if 1.0.0", "libc", - "redox_syscall", "winapi 0.3.9", ] @@ -6299,9 +6297,9 @@ checksum = "2d67a5a62ba6e01cb2192ff309324cb4875d0c451d55fe2319433abe7a05a8ee" [[package]] name = "subtle" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "343f3f510c2915908f155e94f17220b19ccfacf2a64a2a5d8004f2c3e311e7fd" +checksum = "1e81da0851ada1f3e9d4312c704aa4f8806f0f9d69faaf8df2f3464b4a9437c2" [[package]] name = "swap_or_not_shuffle" @@ -6362,7 +6360,7 @@ dependencies = [ "lazy_static", "lighthouse_metrics", "slog", - "tokio 0.3.5", + "tokio 0.3.6", "tokio-compat-02", ] @@ -6534,7 +6532,7 @@ dependencies = [ "slog", "slot_clock", "task_executor", - "tokio 0.3.5", + "tokio 0.3.6", "types", ] @@ -6625,18 +6623,18 @@ dependencies = [ [[package]] name = "tokio" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a12a3eb39ee2c231be64487f1fcbe726c8f2514876a55480a5ab8559fc374252" +checksum = "720ba21c25078711bf456d607987d95bce90f7c3bea5abe1db587862e7a1e87c" dependencies = [ "autocfg 1.0.1", "bytes 0.6.0", "futures-core", - "lazy_static", "libc", "memchr", "mio 0.7.6", "num_cpus", + "once_cell", "parking_lot", "pin-project-lite 0.2.0", "signal-hook-registry", @@ -6655,7 +6653,7 @@ dependencies = [ "once_cell", "pin-project-lite 0.1.11", "tokio 0.2.24", - "tokio 0.3.5", + "tokio 0.3.6", ] [[package]] @@ -6664,7 +6662,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6654a6da4326b0b4228000891d44fbcbdaa1904c6ddfa06617230649073be8fb" dependencies = [ - "tokio 0.3.5", + "tokio 0.3.6", ] [[package]] @@ -6708,7 +6706,7 @@ dependencies = [ "futures-util", "log", "pin-project 1.0.2", - "tokio 0.3.5", + "tokio 0.3.6", "tungstenite", ] @@ -6739,7 +6737,7 @@ dependencies = [ "log", "pin-project-lite 0.1.11", "slab", - "tokio 0.3.5", + "tokio 0.3.6", ] [[package]] @@ -6755,7 +6753,7 @@ dependencies = [ "log", "pin-project-lite 0.2.0", "slab", - "tokio 0.3.5", + "tokio 0.3.6", ] [[package]] @@ -6915,7 +6913,7 @@ dependencies = [ "base64 0.12.3", "byteorder", "bytes 0.5.6", - "http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.2.2", "httparse", "input_buffer", "log", @@ -6943,7 +6941,7 @@ dependencies = [ "base64 0.11.0", "bytes 0.5.6", "chrono", - "http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.2.2", "mime", ] @@ -7065,7 +7063,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8326b2c654932e3e4f9196e69d08fdf7cfd718e1dc6f66b347e6024a0c961402" dependencies = [ "generic-array 0.14.4", - "subtle 2.3.0", + "subtle 2.4.0", ] [[package]] @@ -7169,7 +7167,7 @@ dependencies = [ "slot_clock", "tempdir", "tempfile", - "tokio 0.3.5", + "tokio 0.3.6", "tokio-compat-02", "tree_hash", "types", @@ -7261,7 +7259,7 @@ dependencies = [ "bytes 0.6.0", "futures 0.3.8", "headers 0.3.2 (git+https://github.com/blacktemplar/headers?branch=lighthouse)", - "http 0.2.1 (git+https://github.com/agemanning/http?branch=lighthouse)", + "http 0.2.1", "hyper 0.14.0-dev", "log", "mime", @@ -7273,7 +7271,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "tokio 0.3.5", + "tokio 0.3.6", "tokio-tungstenite", "tower-service", "tracing", @@ -7292,7 +7290,7 @@ dependencies = [ "safe_arith", "serde", "state_processing", - "tokio 0.3.5", + "tokio 0.3.6", "types", "warp", ] @@ -7473,9 +7471,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.20.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f20dea7535251981a9670857150d571846545088359b28e4951d350bdaf179f" +checksum = "82015b7e0b8bad8185994674a13a93306bea76cf5a16c5a181382fd3a5ec2376" dependencies = [ "webpki", ] diff --git a/beacon_node/eth2_libp2p/Cargo.toml b/beacon_node/eth2_libp2p/Cargo.toml index 81ca15040..2e5871e7e 100644 --- a/beacon_node/eth2_libp2p/Cargo.toml +++ b/beacon_node/eth2_libp2p/Cargo.toml @@ -42,7 +42,7 @@ regex = "1.3.9" [dependencies.libp2p] #version = "0.23.0" git = "https://github.com/sigp/rust-libp2p" -rev = "830e6fabb7ee51281a98f5e092f056668adbef25" +rev = "97000533e4710183124abde017c6c3d68287c1ae" default-features = false features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns", "tcp-tokio"] diff --git a/beacon_node/eth2_libp2p/src/behaviour/gossipsub_scoring_parameters.rs b/beacon_node/eth2_libp2p/src/behaviour/gossipsub_scoring_parameters.rs index fe516cab5..875c6cf84 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/gossipsub_scoring_parameters.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/gossipsub_scoring_parameters.rs @@ -1,8 +1,7 @@ use crate::types::{GossipEncoding, GossipKind, GossipTopic}; use crate::{error, TopicHash}; use libp2p::gossipsub::{ - GenericGossipsubConfig, IdentTopic as Topic, PeerScoreParams, PeerScoreThresholds, - TopicScoreParams, + GossipsubConfig, IdentTopic as Topic, PeerScoreParams, PeerScoreThresholds, TopicScoreParams, }; use std::cmp::max; use std::collections::HashMap; @@ -37,10 +36,7 @@ pub struct PeerScoreSettings { } impl PeerScoreSettings { - pub fn new( - chain_spec: &ChainSpec, - gs_config: &GenericGossipsubConfig, - ) -> PeerScoreSettings { + pub fn new(chain_spec: &ChainSpec, gs_config: &GossipsubConfig) -> PeerScoreSettings { let slot = Duration::from_millis(chain_spec.milliseconds_per_slot); let beacon_attestation_subnet_weight = 1.0 / chain_spec.attestation_subnet_count as f64; let max_positive_score = (MAX_IN_MESH_SCORE + MAX_FIRST_MESSAGE_DELIVERIES_SCORE) diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index b29aa6755..fb4079096 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -6,17 +6,13 @@ use crate::peer_manager::{ use crate::rpc::*; use crate::service::METADATA_FILENAME; use crate::types::{ - subnet_id_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, MessageData, + subnet_id_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, SnappyTransform, SubnetDiscovery, }; use crate::Eth2Enr; use crate::{error, metrics, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; use futures::prelude::*; use handler::{BehaviourHandler, BehaviourHandlerIn, DelegateIn, DelegateOut}; -use libp2p::gossipsub::subscription_filter::{ - MaxCountSubscriptionFilter, WhitelistSubscriptionFilter, -}; -use libp2p::gossipsub::PeerScoreThresholds; use libp2p::{ core::{ connection::{ConnectedPoint, ConnectionId, ListenerId}, @@ -24,13 +20,14 @@ use libp2p::{ Multiaddr, }, gossipsub::{ - GenericGossipsub, GenericGossipsubEvent, IdentTopic as Topic, MessageAcceptance, - MessageAuthenticity, MessageId, + subscription_filter::{MaxCountSubscriptionFilter, WhitelistSubscriptionFilter}, + Gossipsub as BaseGossipsub, GossipsubEvent, IdentTopic as Topic, MessageAcceptance, + MessageAuthenticity, MessageId, PeerScoreThresholds, }, identify::{Identify, IdentifyEvent}, swarm::{ - NetworkBehaviour, NetworkBehaviourAction as NBAction, NotifyHandler, PollParameters, - ProtocolsHandler, + AddressScore, NetworkBehaviour, NetworkBehaviourAction as NBAction, NotifyHandler, + PollParameters, ProtocolsHandler, }, PeerId, }; @@ -58,8 +55,7 @@ pub const GOSSIPSUB_GREYLIST_THRESHOLD: f64 = -16000.0; pub type PeerRequestId = (ConnectionId, SubstreamId); pub type SubscriptionFilter = MaxCountSubscriptionFilter; -pub type Gossipsub = GenericGossipsub; -pub type GossipsubEvent = GenericGossipsubEvent; +pub type Gossipsub = BaseGossipsub; /// The types of events than can be obtained from polling the behaviour. #[derive(Debug)] @@ -181,10 +177,14 @@ impl Behaviour { max_subscriptions_per_request: 100, //this is according to the current go implementation }; - let mut gossipsub = Gossipsub::new_with_subscription_filter( + // Initialize the compression transform. + let snappy_transform = SnappyTransform::new(net_conf.gs_config.max_transmit_size()); + + let mut gossipsub = Gossipsub::new_with_subscription_filter_and_transform( MessageAuthenticity::Anonymous, net_conf.gs_config.clone(), filter, + snappy_transform, ) .map_err(|e| format!("Could not construct gossipsub: {:?}", e))?; @@ -390,34 +390,30 @@ impl Behaviour { pub fn publish(&mut self, messages: Vec>) { for message in messages { for topic in message.topics(GossipEncoding::default(), self.enr_fork_id.fork_digest) { - match message.encode(GossipEncoding::default()) { - Ok(message_data) => { - if let Err(e) = self.gossipsub.publish(topic.clone().into(), message_data) { - slog::warn!(self.log, "Could not publish message"; + let message_data = message.encode(GossipEncoding::default()); + if let Err(e) = self.gossipsub.publish(topic.clone().into(), message_data) { + slog::warn!(self.log, "Could not publish message"; "error" => ?e); - // add to metrics - match topic.kind() { - GossipKind::Attestation(subnet_id) => { - if let Some(v) = metrics::get_int_gauge( - &metrics::FAILED_ATTESTATION_PUBLISHES_PER_SUBNET, - &[&subnet_id.to_string()], - ) { - v.inc() - }; - } - kind => { - if let Some(v) = metrics::get_int_gauge( - &metrics::FAILED_PUBLISHES_PER_MAIN_TOPIC, - &[&format!("{:?}", kind)], - ) { - v.inc() - }; - } - } + // add to metrics + match topic.kind() { + GossipKind::Attestation(subnet_id) => { + if let Some(v) = metrics::get_int_gauge( + &metrics::FAILED_ATTESTATION_PUBLISHES_PER_SUBNET, + &[&subnet_id.to_string()], + ) { + v.inc() + }; + } + kind => { + if let Some(v) = metrics::get_int_gauge( + &metrics::FAILED_PUBLISHES_PER_MAIN_TOPIC, + &[&format!("{:?}", kind)], + ) { + v.inc() + }; } } - Err(e) => crit!(self.log, "Could not publish message"; "error" => e), } } } @@ -642,7 +638,7 @@ impl Behaviour { } => { // Note: We are keeping track here of the peer that sent us the message, not the // peer that originally published the message. - match PubsubMessage::decode(&gs_msg.topic, gs_msg.data()) { + match PubsubMessage::decode(&gs_msg.topic, &gs_msg.data) { Err(e) => { debug!(self.log, "Could not decode gossipsub message"; "error" => e); //reject the message @@ -854,7 +850,10 @@ impl Behaviour { }); } PeerManagerEvent::SocketUpdated(address) => { - return Poll::Ready(NBAction::ReportObservedAddr { address }); + return Poll::Ready(NBAction::ReportObservedAddr { + address, + score: AddressScore::Finite(1), + }); } PeerManagerEvent::Status(peer_id) => { // it's time to status. We don't keep a beacon chain reference here, so we inform @@ -1028,8 +1027,7 @@ impl NetworkBehaviour for Behaviour { trace!(self.log, "Disconnecting newly connected peer"; "peer_id" => %peer_id, "reason" => %goodbye_reason) } } - self.peers_to_dc - .push_back((peer_id.clone(), Some(goodbye_reason))); + self.peers_to_dc.push_back((*peer_id, Some(goodbye_reason))); // NOTE: We don't inform the peer manager that this peer is disconnecting. It is simply // rejected with a goodbye. return; @@ -1041,13 +1039,13 @@ impl NetworkBehaviour for Behaviour { ConnectedPoint::Listener { send_back_addr, .. } => { self.peer_manager .connect_ingoing(&peer_id, send_back_addr.clone()); - self.add_event(BehaviourEvent::PeerConnected(peer_id.clone())); + self.add_event(BehaviourEvent::PeerConnected(*peer_id)); debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => "Incoming"); } ConnectedPoint::Dialer { address } => { self.peer_manager .connect_outgoing(&peer_id, address.clone()); - self.add_event(BehaviourEvent::PeerDialed(peer_id.clone())); + self.add_event(BehaviourEvent::PeerDialed(*peer_id)); debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => "Dialed"); } } @@ -1122,7 +1120,7 @@ impl NetworkBehaviour for Behaviour { // Both these cases, the peer has been previously registered in the sub protocols and // potentially the application layer. // Inform the application. - self.add_event(BehaviourEvent::PeerDisconnected(peer_id.clone())); + self.add_event(BehaviourEvent::PeerDisconnected(*peer_id)); // Inform the behaviour. delegate_to_behaviours!(self, inject_disconnected, peer_id); @@ -1260,8 +1258,8 @@ impl NetworkBehaviour for Behaviour { ), }); } - NBAction::ReportObservedAddr { address } => { - return Poll::Ready(NBAction::ReportObservedAddr { address }) + NBAction::ReportObservedAddr { address, score } => { + return Poll::Ready(NBAction::ReportObservedAddr { address, score }) } }, Poll::Pending => break, diff --git a/beacon_node/eth2_libp2p/src/config.rs b/beacon_node/eth2_libp2p/src/config.rs index 017366652..8b1979b4a 100644 --- a/beacon_node/eth2_libp2p/src/config.rs +++ b/beacon_node/eth2_libp2p/src/config.rs @@ -1,12 +1,12 @@ -use crate::types::{GossipKind, MessageData}; +use crate::types::GossipKind; use crate::{Enr, PeerIdSerialized}; use directory::{ DEFAULT_BEACON_NODE_DIR, DEFAULT_HARDCODED_NETWORK, DEFAULT_NETWORK_DIR, DEFAULT_ROOT_DIR, }; use discv5::{Discv5Config, Discv5ConfigBuilder}; use libp2p::gossipsub::{ - FastMessageId, GenericGossipsubConfig, GenericGossipsubConfigBuilder, GenericGossipsubMessage, - MessageId, RawGossipsubMessage, ValidationMode, + FastMessageId, GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage, MessageId, + RawGossipsubMessage, ValidationMode, }; use libp2p::Multiaddr; use serde_derive::{Deserialize, Serialize}; @@ -15,14 +15,13 @@ use std::path::PathBuf; use std::time::Duration; pub const GOSSIP_MAX_SIZE: usize = 1_048_576; -const MESSAGE_DOMAIN_INVALID_SNAPPY: [u8; 4] = [0, 0, 0, 0]; + +// We treat uncompressed messages as invalid and never use the INVALID_SNAPPY_DOMAIN as in the +// specification. We leave it here for posterity. +// const MESSAGE_DOMAIN_INVALID_SNAPPY: [u8; 4] = [0, 0, 0, 0]; const MESSAGE_DOMAIN_VALID_SNAPPY: [u8; 4] = [1, 0, 0, 0]; pub const MESH_N_LOW: usize = 6; -pub type GossipsubConfig = GenericGossipsubConfig; -pub type GossipsubConfigBuilder = GenericGossipsubConfigBuilder; -pub type GossipsubMessage = GenericGossipsubMessage; - #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] /// Network configuration for lighthouse. @@ -110,35 +109,28 @@ impl Default for Config { // The function used to generate a gossipsub message id // We use the first 8 bytes of SHA256(data) for content addressing - let fast_gossip_message_id = - |message: &RawGossipsubMessage| FastMessageId::from(&Sha256::digest(&message.data)[..]); + let fast_gossip_message_id = |message: &RawGossipsubMessage| { + FastMessageId::from(&Sha256::digest(&message.data)[..8]) + }; fn prefix(prefix: [u8; 4], data: &[u8]) -> Vec { - prefix - .to_vec() - .into_iter() - .chain(data.iter().cloned()) - .collect() + let mut vec = Vec::with_capacity(prefix.len() + data.len()); + vec.extend_from_slice(&prefix); + vec.extend_from_slice(data); + vec } let gossip_message_id = |message: &GossipsubMessage| { MessageId::from( - &Sha256::digest( - { - match &message.data.decompressed { - Ok(decompressed) => prefix(MESSAGE_DOMAIN_VALID_SNAPPY, decompressed), - _ => prefix(MESSAGE_DOMAIN_INVALID_SNAPPY, &message.data.raw), - } - } - .as_slice(), - )[..20], + &Sha256::digest(prefix(MESSAGE_DOMAIN_VALID_SNAPPY, &message.data).as_slice()) + [..20], ) }; // gossipsub configuration // Note: The topics by default are sent as plain strings. Hashes are an optional // parameter. - let gs_config = GossipsubConfigBuilder::new() + let gs_config = GossipsubConfigBuilder::default() .max_transmit_size(GOSSIP_MAX_SIZE) .heartbeat_interval(Duration::from_millis(700)) .mesh_n(8) @@ -147,6 +139,7 @@ impl Default for Config { .gossip_lazy(6) .fanout_ttl(Duration::from_secs(60)) .history_length(6) + .max_messages_per_rpc(Some(10)) .history_gossip(3) .validate_messages() // require validation before propagation .validation_mode(ValidationMode::Anonymous) diff --git a/beacon_node/eth2_libp2p/src/discovery/enr_ext.rs b/beacon_node/eth2_libp2p/src/discovery/enr_ext.rs index 443ec29c3..daf769ea1 100644 --- a/beacon_node/eth2_libp2p/src/discovery/enr_ext.rs +++ b/beacon_node/eth2_libp2p/src/discovery/enr_ext.rs @@ -241,7 +241,7 @@ impl CombinedKeyExt for CombinedKey { pub fn peer_id_to_node_id(peer_id: &PeerId) -> Result { // A libp2p peer id byte representation should be 2 length bytes + 4 protobuf bytes + compressed pk bytes // if generated from a PublicKey with Identity multihash. - let pk_bytes = &peer_id.as_bytes()[2..]; + let pk_bytes = &peer_id.to_bytes()[2..]; match PublicKey::from_protobuf_encoding(pk_bytes).map_err(|e| { format!( diff --git a/beacon_node/eth2_libp2p/src/lib.rs b/beacon_node/eth2_libp2p/src/lib.rs index 06a66c99f..3a582ac71 100644 --- a/beacon_node/eth2_libp2p/src/lib.rs +++ b/beacon_node/eth2_libp2p/src/lib.rs @@ -63,7 +63,6 @@ impl<'de> Deserialize<'de> for PeerIdSerialized { pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage, SubnetDiscovery}; pub use behaviour::{BehaviourEvent, Gossipsub, PeerRequestId, Request, Response}; pub use config::Config as NetworkConfig; -pub use config::{GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage}; pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr}; pub use discv5; pub use libp2p::bandwidth::BandwidthSinks; diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index e0c94d9d6..cc5e9d87b 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -136,7 +136,7 @@ impl PeerManager { /// /// Returns true if the peer was accepted into the database. pub fn dial_peer(&mut self, peer_id: &PeerId) -> bool { - self.events.push(PeerManagerEvent::Dial(peer_id.clone())); + self.events.push(PeerManagerEvent::Dial(*peer_id)); self.connect_peer(peer_id, ConnectingType::Dialing) } @@ -174,7 +174,7 @@ impl PeerManager { { // update the state of the peer. self.events - .push(PeerManagerEvent::DisconnectPeer(peer_id.clone(), reason)); + .push(PeerManagerEvent::DisconnectPeer(*peer_id, reason)); } } @@ -282,7 +282,7 @@ impl PeerManager { /// A STATUS message has been received from a peer. This resets the status timer. pub fn peer_statusd(&mut self, peer_id: &PeerId) { - self.status_peers.insert(peer_id.clone()); + self.status_peers.insert(*peer_id); } /// Adds a gossipsub subscription to a peer in the peerdb. @@ -495,10 +495,10 @@ impl PeerManager { debug!(self.log, "Received a ping request"; "peer_id" => %peer_id, "seq_no" => seq); match peer_info.connection_direction { Some(ConnectionDirection::Incoming) => { - self.inbound_ping_peers.insert(peer_id.clone()); + self.inbound_ping_peers.insert(*peer_id); } Some(ConnectionDirection::Outgoing) => { - self.outbound_ping_peers.insert(peer_id.clone()); + self.outbound_ping_peers.insert(*peer_id); } None => { warn!(self.log, "Received a ping from a peer with an unknown connection direction"; "peer_id" => %peer_id); @@ -510,15 +510,13 @@ impl PeerManager { if meta_data.seq_number < seq { debug!(self.log, "Requesting new metadata from peer"; "peer_id" => %peer_id, "known_seq_no" => meta_data.seq_number, "ping_seq_no" => seq); - self.events - .push(PeerManagerEvent::MetaData(peer_id.clone())); + self.events.push(PeerManagerEvent::MetaData(*peer_id)); } } else { // if we don't know the meta-data, request it debug!(self.log, "Requesting first metadata from peer"; "peer_id" => %peer_id); - self.events - .push(PeerManagerEvent::MetaData(peer_id.clone())); + self.events.push(PeerManagerEvent::MetaData(*peer_id)); } } else { crit!(self.log, "Received a PING from an unknown peer"; @@ -536,15 +534,13 @@ impl PeerManager { if meta_data.seq_number < seq { debug!(self.log, "Requesting new metadata from peer"; "peer_id" => %peer_id, "known_seq_no" => meta_data.seq_number, "pong_seq_no" => seq); - self.events - .push(PeerManagerEvent::MetaData(peer_id.clone())); + self.events.push(PeerManagerEvent::MetaData(*peer_id)); } } else { // if we don't know the meta-data, request it debug!(self.log, "Requesting first metadata from peer"; "peer_id" => %peer_id); - self.events - .push(PeerManagerEvent::MetaData(peer_id.clone())); + self.events.push(PeerManagerEvent::MetaData(*peer_id)); } } else { crit!(self.log, "Received a PONG from an unknown peer"; "peer_id" => %peer_id); @@ -677,7 +673,7 @@ impl PeerManager { && !peers.is_connected_or_dialing(peer_id) && !peers.is_banned(peer_id) { - Some(peer_id.clone()) + Some(*peer_id) } else { None } @@ -755,18 +751,18 @@ impl PeerManager { ConnectingType::IngoingConnected { multiaddr } => { peerdb.connect_ingoing(peer_id, multiaddr, enr); // start a timer to ping inbound peers. - self.inbound_ping_peers.insert(peer_id.clone()); + self.inbound_ping_peers.insert(*peer_id); } ConnectingType::OutgoingConnected { multiaddr } => { peerdb.connect_outgoing(peer_id, multiaddr, enr); // start a timer for to ping outbound peers. - self.outbound_ping_peers.insert(peer_id.clone()); + self.outbound_ping_peers.insert(*peer_id); } } } // start a ping and status timer for the peer - self.status_peers.insert(peer_id.clone()); + self.status_peers.insert(*peer_id); // increment prometheus metrics metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); @@ -806,7 +802,7 @@ impl PeerManager { match info.score_state() { ScoreState::Banned => { debug!(log, "Peer has been banned"; "peer_id" => %peer_id, "score" => %info.score()); - to_ban_peers.push(peer_id.clone()); + to_ban_peers.push(*peer_id); } ScoreState::Disconnected => { debug!(log, "Peer transitioned to disconnect state"; "peer_id" => %peer_id, "score" => %info.score(), "past_state" => %previous_state); @@ -815,18 +811,18 @@ impl PeerManager { // Change the state to inform that we are disconnecting the peer. info.disconnecting(false); events.push(PeerManagerEvent::DisconnectPeer( - peer_id.clone(), + *peer_id, GoodbyeReason::BadScore, )); } else if info.is_banned() { - to_unban_peers.push(peer_id.clone()); + to_unban_peers.push(*peer_id); } } ScoreState::Healthy => { debug!(log, "Peer transitioned to healthy state"; "peer_id" => %peer_id, "score" => %info.score(), "past_state" => %previous_state); // unban the peer if it was previously banned. if info.is_banned() { - to_unban_peers.push(peer_id.clone()); + to_unban_peers.push(*peer_id); } } } @@ -885,7 +881,7 @@ impl PeerManager { if peer_db.disconnect_and_ban(peer_id) { // The peer was currently connected, so we start a disconnection. self.events.push(PeerManagerEvent::DisconnectPeer( - peer_id.clone(), + *peer_id, GoodbyeReason::BadScore, )); } @@ -960,7 +956,7 @@ impl PeerManager { //disconnected in update_peer_scores .filter(|(_, info)| info.score_state() == ScoreState::Healthy) { - disconnecting_peers.push((*peer_id).clone()); + disconnecting_peers.push(**peer_id); } } @@ -996,7 +992,7 @@ impl Stream for PeerManager { loop { match self.inbound_ping_peers.poll_next_unpin(cx) { Poll::Ready(Some(Ok(peer_id))) => { - self.inbound_ping_peers.insert(peer_id.clone()); + self.inbound_ping_peers.insert(peer_id); self.events.push(PeerManagerEvent::Ping(peer_id)); } Poll::Ready(Some(Err(e))) => { @@ -1009,7 +1005,7 @@ impl Stream for PeerManager { loop { match self.outbound_ping_peers.poll_next_unpin(cx) { Poll::Ready(Some(Ok(peer_id))) => { - self.outbound_ping_peers.insert(peer_id.clone()); + self.outbound_ping_peers.insert(peer_id); self.events.push(PeerManagerEvent::Ping(peer_id)); } Poll::Ready(Some(Err(e))) => { @@ -1024,7 +1020,7 @@ impl Stream for PeerManager { loop { match self.status_peers.poll_next_unpin(cx) { Poll::Ready(Some(Ok(peer_id))) => { - self.status_peers.insert(peer_id.clone()); + self.status_peers.insert(peer_id); self.events.push(PeerManagerEvent::Status(peer_id)) } Poll::Ready(Some(Err(e))) => { diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index f5e93b5b2..379505d80 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -322,7 +322,7 @@ impl PeerDB { /// A peer is being dialed. pub fn dialing_peer(&mut self, peer_id: &PeerId, enr: Option) { - let info = self.peers.entry(peer_id.clone()).or_default(); + let info = self.peers.entry(*peer_id).or_default(); info.enr = enr; if info.is_disconnected() { @@ -341,7 +341,7 @@ impl PeerDB { /// Update min ttl of a peer. pub fn update_min_ttl(&mut self, peer_id: &PeerId, min_ttl: Instant) { - let info = self.peers.entry(peer_id.clone()).or_default(); + let info = self.peers.entry(*peer_id).or_default(); // only update if the ttl is longer if info.min_ttl.is_none() || Some(min_ttl) > info.min_ttl { @@ -382,7 +382,7 @@ impl PeerDB { enr: Option, direction: ConnectionDirection, ) { - let info = self.peers.entry(peer_id.clone()).or_default(); + let info = self.peers.entry(*peer_id).or_default(); info.enr = enr; if info.is_disconnected() { @@ -459,7 +459,7 @@ impl PeerDB { // peer's score to be a banned state. pub fn disconnect_and_ban(&mut self, peer_id: &PeerId) -> bool { let log_ref = &self.log; - let info = self.peers.entry(peer_id.clone()).or_insert_with(|| { + let info = self.peers.entry(*peer_id).or_insert_with(|| { warn!(log_ref, "Banning unknown peer"; "peer_id" => %peer_id); PeerInfo::default() @@ -517,7 +517,7 @@ impl PeerDB { /// If this is called for a banned peer, it will error. pub fn unban(&mut self, peer_id: &PeerId) -> Result<(), &'static str> { let log_ref = &self.log; - let info = self.peers.entry(peer_id.clone()).or_insert_with(|| { + let info = self.peers.entry(*peer_id).or_insert_with(|| { warn!(log_ref, "UnBanning unknown peer"; "peer_id" => %peer_id); PeerInfo::default() @@ -557,7 +557,7 @@ impl PeerDB { { self.banned_peers_count .remove_banned_peer(info.seen_addresses()); - Some(id.clone()) + Some(*id) } else { // If there is no minimum, this is a coding error. crit!( @@ -584,7 +584,7 @@ impl PeerDB { _ => None, }) .min_by_key(|(_, since)| *since) - .map(|(id, _)| id.clone()) + .map(|(id, _)| *id) { debug!(self.log, "Removing old disconnected peer"; "peer_id" => %to_drop); self.peers.remove(&to_drop); diff --git a/beacon_node/eth2_libp2p/src/rpc/mod.rs b/beacon_node/eth2_libp2p/src/rpc/mod.rs index d491a082c..a1f4fac03 100644 --- a/beacon_node/eth2_libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2_libp2p/src/rpc/mod.rs @@ -190,7 +190,7 @@ where debug!(self.log, "Requesting new peer's metadata"; "peer_id" => %peer_id); let rpc_event = RPCSend::Request(RequestId::Behaviour, RPCRequest::MetaData(PhantomData)); self.events.push(NetworkBehaviourAction::NotifyHandler { - peer_id: peer_id.clone(), + peer_id: *peer_id, handler: NotifyHandler::Any, event: rpc_event, }); diff --git a/beacon_node/eth2_libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs index f67cf821d..e49344919 100644 --- a/beacon_node/eth2_libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -9,7 +9,8 @@ use crate::EnrExt; use crate::{NetworkConfig, NetworkGlobals, PeerAction, ReportSource}; use futures::prelude::*; use libp2p::core::{ - identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed, + connection::ConnectionLimits, identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox, + transport::Boxed, }; use libp2p::{ bandwidth::{BandwidthLogging, BandwidthSinks}, @@ -28,7 +29,7 @@ use types::{ChainSpec, EnrForkId, EthSpec}; pub const NETWORK_KEY_FILENAME: &str = "key"; /// The maximum simultaneous libp2p connections per peer. -const MAX_CONNECTIONS_PER_PEER: usize = 1; +const MAX_CONNECTIONS_PER_PEER: u32 = 1; /// The filename to store our local metadata. pub const METADATA_FILENAME: &str = "metadata"; @@ -123,13 +124,20 @@ impl Service { self.0.spawn(f, "libp2p"); } } + + // sets up the libp2p connection limits + let limits = ConnectionLimits::default() + .with_max_pending_incoming(Some(5)) + .with_max_pending_outgoing(Some(16)) + .with_max_established_incoming(Some((config.target_peers as f64 * 1.2) as u32)) + .with_max_established_outgoing(Some((config.target_peers as f64 * 1.2) as u32)) + .with_max_established_per_peer(Some(MAX_CONNECTIONS_PER_PEER)); + ( - SwarmBuilder::new(transport, behaviour, local_peer_id.clone()) - .notify_handler_buffer_size(std::num::NonZeroUsize::new(32).expect("Not zero")) + SwarmBuilder::new(transport, behaviour, local_peer_id) + .notify_handler_buffer_size(std::num::NonZeroUsize::new(7).expect("Not zero")) .connection_event_buffer_size(64) - .incoming_connection_limit(10) - .outgoing_connection_limit(config.target_peers * 2) - .peer_connection_limit(MAX_CONNECTIONS_PER_PEER) + .connection_limits(limits) .executor(Box::new(Executor(executor))) .build(), bandwidth, @@ -146,7 +154,7 @@ impl Service { match Swarm::listen_on(&mut swarm, listen_multiaddr.clone()) { Ok(_) => { let mut log_address = listen_multiaddr; - log_address.push(Protocol::P2p(local_peer_id.clone().into())); + log_address.push(Protocol::P2p(local_peer_id.into())); info!(log, "Listening established"; "address" => %log_address); } Err(err) => { diff --git a/beacon_node/eth2_libp2p/src/types/globals.rs b/beacon_node/eth2_libp2p/src/types/globals.rs index 746bb335b..67abcf772 100644 --- a/beacon_node/eth2_libp2p/src/types/globals.rs +++ b/beacon_node/eth2_libp2p/src/types/globals.rs @@ -61,7 +61,7 @@ impl NetworkGlobals { /// Returns the local libp2p PeerID. pub fn local_peer_id(&self) -> PeerId { - self.peer_id.read().clone() + *self.peer_id.read() } /// Returns the list of `Multiaddr` that the underlying libp2p instance is listening on. diff --git a/beacon_node/eth2_libp2p/src/types/mod.rs b/beacon_node/eth2_libp2p/src/types/mod.rs index 097c6c87c..156e6a1d7 100644 --- a/beacon_node/eth2_libp2p/src/types/mod.rs +++ b/beacon_node/eth2_libp2p/src/types/mod.rs @@ -13,7 +13,7 @@ pub type EnrBitfield = BitVector; pub type Enr = discv5::enr::Enr; pub use globals::NetworkGlobals; -pub use pubsub::{MessageData, PubsubMessage}; +pub use pubsub::{PubsubMessage, SnappyTransform}; pub use subnet::SubnetDiscovery; pub use sync_state::SyncState; pub use topics::{subnet_id_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, CORE_TOPICS}; diff --git a/beacon_node/eth2_libp2p/src/types/pubsub.rs b/beacon_node/eth2_libp2p/src/types/pubsub.rs index f62898e9c..325a6c443 100644 --- a/beacon_node/eth2_libp2p/src/types/pubsub.rs +++ b/beacon_node/eth2_libp2p/src/types/pubsub.rs @@ -1,44 +1,18 @@ //! Handles the encoding and decoding of pubsub messages. -use crate::config::GOSSIP_MAX_SIZE; use crate::types::{GossipEncoding, GossipKind, GossipTopic}; use crate::TopicHash; +use libp2p::gossipsub::{DataTransform, GossipsubMessage, RawGossipsubMessage}; use snap::raw::{decompress_len, Decoder, Encoder}; use ssz::{Decode, Encode}; use std::boxed::Box; +use std::io::{Error, ErrorKind}; use types::SubnetId; use types::{ Attestation, AttesterSlashing, EthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, }; -#[derive(Clone)] -pub struct MessageData { - pub raw: Vec, - pub decompressed: Result, String>, -} - -impl AsRef<[u8]> for MessageData { - fn as_ref(&self) -> &[u8] { - self.raw.as_ref() - } -} - -impl Into> for MessageData { - fn into(self) -> Vec { - self.raw - } -} - -impl From> for MessageData { - fn from(raw: Vec) -> Self { - Self { - decompressed: decompress_snappy(raw.as_ref()), - raw, - } - } -} - #[derive(Debug, Clone, PartialEq)] pub enum PubsubMessage { /// Gossipsub message providing notification of a new block. @@ -55,21 +29,63 @@ pub enum PubsubMessage { AttesterSlashing(Box>), } -fn decompress_snappy(data: &[u8]) -> Result, String> { - // Exit early if uncompressed data is > GOSSIP_MAX_SIZE - match decompress_len(data) { - Ok(n) if n > GOSSIP_MAX_SIZE => { - return Err("ssz_snappy decoded data > GOSSIP_MAX_SIZE".into()); +// Implements the `DataTransform` trait of gossipsub to employ snappy compression +pub struct SnappyTransform { + /// Sets the maximum size we allow gossipsub messages to decompress to. + max_size_per_message: usize, +} + +impl SnappyTransform { + pub fn new(max_size_per_message: usize) -> Self { + SnappyTransform { + max_size_per_message, } - Ok(_) => {} - Err(e) => { - return Err(format!("{}", e)); + } +} + +impl DataTransform for SnappyTransform { + // Provides the snappy decompression from RawGossipsubMessages + fn inbound_transform( + &self, + raw_message: RawGossipsubMessage, + ) -> Result { + // check the length of the raw bytes + let len = decompress_len(&raw_message.data)?; + if len > self.max_size_per_message { + return Err(Error::new( + ErrorKind::InvalidData, + "ssz_snappy decoded data > GOSSIP_MAX_SIZE", + )); } - }; - let mut decoder = Decoder::new(); - match decoder.decompress_vec(data) { - Ok(decompressed_data) => Ok(decompressed_data), - Err(e) => Err(format!("{}", e)), + + let mut decoder = Decoder::new(); + let decompressed_data = decoder.decompress_vec(&raw_message.data)?; + + // Build the GossipsubMessage struct + Ok(GossipsubMessage { + source: raw_message.source, + data: decompressed_data, + sequence_number: raw_message.sequence_number, + topic: raw_message.topic, + }) + } + + /// Provides the snappy compression logic to gossipsub. + fn outbound_transform( + &self, + _topic: &TopicHash, + data: Vec, + ) -> Result, std::io::Error> { + // Currently we are not employing topic-based compression. Everything is expected to be + // snappy compressed. + if data.len() > self.max_size_per_message { + return Err(Error::new( + ErrorKind::InvalidData, + "ssz_snappy Encoded data > GOSSIP_MAX_SIZE", + )); + } + let mut encoder = Encoder::new(); + encoder.compress_vec(&data).map_err(Into::into) } } @@ -98,48 +114,49 @@ impl PubsubMessage { /* Note: This is assuming we are not hashing topics. If we choose to hash topics, these will * need to be modified. */ - pub fn decode(topic: &TopicHash, data: &MessageData) -> Result { + pub fn decode(topic: &TopicHash, data: &[u8]) -> Result { match GossipTopic::decode(topic.as_str()) { Err(_) => Err(format!("Unknown gossipsub topic: {:?}", topic)), Ok(gossip_topic) => { - let decompressed_data = match gossip_topic.encoding() { - GossipEncoding::SSZSnappy => data.decompressed.as_ref()?.as_slice(), - }; + // All topics are currently expected to be compressed and decompressed with snappy. + // This is done in the `SnappyTransform` struct. + // Therefore compression has already been handled for us by the time we are + // decoding the objects here. + // the ssz decoders match gossip_topic.kind() { GossipKind::BeaconAggregateAndProof => { - let agg_and_proof = - SignedAggregateAndProof::from_ssz_bytes(decompressed_data) - .map_err(|e| format!("{:?}", e))?; + let agg_and_proof = SignedAggregateAndProof::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?; Ok(PubsubMessage::AggregateAndProofAttestation(Box::new( agg_and_proof, ))) } GossipKind::Attestation(subnet_id) => { - let attestation = Attestation::from_ssz_bytes(decompressed_data) - .map_err(|e| format!("{:?}", e))?; + let attestation = + Attestation::from_ssz_bytes(data).map_err(|e| format!("{:?}", e))?; Ok(PubsubMessage::Attestation(Box::new(( *subnet_id, attestation, )))) } GossipKind::BeaconBlock => { - let beacon_block = SignedBeaconBlock::from_ssz_bytes(decompressed_data) + let beacon_block = SignedBeaconBlock::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; Ok(PubsubMessage::BeaconBlock(Box::new(beacon_block))) } GossipKind::VoluntaryExit => { - let voluntary_exit = SignedVoluntaryExit::from_ssz_bytes(decompressed_data) + let voluntary_exit = SignedVoluntaryExit::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; Ok(PubsubMessage::VoluntaryExit(Box::new(voluntary_exit))) } GossipKind::ProposerSlashing => { - let proposer_slashing = ProposerSlashing::from_ssz_bytes(decompressed_data) + let proposer_slashing = ProposerSlashing::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; Ok(PubsubMessage::ProposerSlashing(Box::new(proposer_slashing))) } GossipKind::AttesterSlashing => { - let attester_slashing = AttesterSlashing::from_ssz_bytes(decompressed_data) + let attester_slashing = AttesterSlashing::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; Ok(PubsubMessage::AttesterSlashing(Box::new(attester_slashing))) } @@ -150,26 +167,18 @@ impl PubsubMessage { /// Encodes a `PubsubMessage` based on the topic encodings. The first known encoding is used. If /// no encoding is known, and error is returned. - pub fn encode(&self, encoding: GossipEncoding) -> Result, String> { - let data = match &self { + pub fn encode(&self, _encoding: GossipEncoding) -> Vec { + // Currently do not employ encoding strategies based on the topic. All messages are ssz + // encoded. + // Also note, that the compression is handled by the `SnappyTransform` struct. Gossipsub will compress the + // messages for us. + match &self { PubsubMessage::BeaconBlock(data) => data.as_ssz_bytes(), PubsubMessage::AggregateAndProofAttestation(data) => data.as_ssz_bytes(), PubsubMessage::VoluntaryExit(data) => data.as_ssz_bytes(), PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(), PubsubMessage::AttesterSlashing(data) => data.as_ssz_bytes(), PubsubMessage::Attestation(data) => data.1.as_ssz_bytes(), - }; - match encoding { - GossipEncoding::SSZSnappy => { - let mut encoder = Encoder::new(); - match encoder.compress_vec(&data) { - Ok(compressed) if compressed.len() > GOSSIP_MAX_SIZE => { - Err("ssz_snappy Encoded data > GOSSIP_MAX_SIZE".into()) - } - Ok(compressed) => Ok(compressed), - Err(e) => Err(format!("{}", e)), - } - } } } } @@ -200,20 +209,3 @@ impl std::fmt::Display for PubsubMessage { } } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_gossip_max_size() { - // Cannot decode more than gossip max size - let mut encoder = Encoder::new(); - let payload = encoder.compress_vec(&[0; GOSSIP_MAX_SIZE + 1]).unwrap(); - let message_data: MessageData = payload.into(); - assert_eq!( - message_data.decompressed.unwrap_err(), - "ssz_snappy decoded data > GOSSIP_MAX_SIZE".to_string() - ); - } -} diff --git a/beacon_node/eth2_libp2p/tests/common/mod.rs b/beacon_node/eth2_libp2p/tests/common/mod.rs index c0d08a91d..af879f51b 100644 --- a/beacon_node/eth2_libp2p/tests/common/mod.rs +++ b/beacon_node/eth2_libp2p/tests/common/mod.rs @@ -3,7 +3,8 @@ use eth2_libp2p::Enr; use eth2_libp2p::EnrExt; use eth2_libp2p::Multiaddr; use eth2_libp2p::Service as LibP2PService; -use eth2_libp2p::{GossipsubConfigBuilder, Libp2pEvent, NetworkConfig}; +use eth2_libp2p::{Libp2pEvent, NetworkConfig}; +use libp2p::gossipsub::GossipsubConfigBuilder; use slog::{debug, error, o, Drain}; use std::net::{TcpListener, UdpSocket}; use std::sync::Weak; diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 5524a290c..0a625cdcc 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1389,7 +1389,7 @@ pub fn serve( |requested_peer_id: String, network_globals: Arc>| { blocking_json_task(move || { let peer_id = PeerId::from_bytes( - bs58::decode(requested_peer_id.as_str()) + &bs58::decode(requested_peer_id.as_str()) .into_vec() .map_err(|e| { warp_utils::reject::custom_bad_request(format!( diff --git a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs index 1c40721a2..d9aec89d8 100644 --- a/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/gossip_methods.rs @@ -83,7 +83,7 @@ impl Worker { // Indicate to the `Network` service that this message is valid and can be // propagated on the gossip network. - self.propagate_validation_result(message_id, peer_id.clone(), MessageAcceptance::Accept); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); if !should_import { return; @@ -160,7 +160,7 @@ impl Worker { // Indicate to the `Network` service that this message is valid and can be // propagated on the gossip network. - self.propagate_validation_result(message_id, peer_id.clone(), MessageAcceptance::Accept); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); metrics::inc_counter(&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL); @@ -219,11 +219,7 @@ impl Worker { "slot" => verified_block.block.slot(), "hash" => %verified_block.block_root ); - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Accept, - ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); verified_block } Err(BlockError::ParentUnknown(block)) => { @@ -239,7 +235,7 @@ impl Worker { debug!(self.log, "Could not verify block for gossip, ignoring the block"; "error" => %e); // Prevent recurring behaviour by penalizing the peer slightly. - self.gossip_penalize_peer(peer_id.clone(), PeerAction::HighToleranceError); + self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); return; } @@ -258,11 +254,7 @@ impl Worker { | Err(e @ BlockError::GenesisBlock) => { warn!(self.log, "Could not verify block for gossip, rejecting the block"; "error" => %e); - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Reject, - ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); return; } @@ -337,11 +329,7 @@ impl Worker { let exit = match self.chain.verify_voluntary_exit_for_gossip(voluntary_exit) { Ok(ObservationOutcome::New(exit)) => exit, Ok(ObservationOutcome::AlreadyKnown) => { - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Ignore, - ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); debug!( self.log, "Dropping exit for already exiting validator"; @@ -360,11 +348,7 @@ impl Worker { ); // These errors occur due to a fault in the beacon chain. It is not necessarily // the fault on the peer. - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Ignore, - ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); // We still penalize a peer slightly to prevent overuse of invalids. self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError); return; @@ -416,11 +400,7 @@ impl Worker { "peer" => %peer_id, "error" => ?e ); - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Ignore, - ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); // Penalize peer slightly for invalids. self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError); @@ -466,11 +446,7 @@ impl Worker { "peer" => %peer_id, "error" => ?e ); - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Ignore, - ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); // Penalize peer slightly for invalids. self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError); return; @@ -522,14 +498,10 @@ impl Worker { // Peers that are slow or not to spec can spam us with these messages draining our // bandwidth. We therefore penalize these peers when they do this. - self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); // Do not propagate these messages. - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Ignore, - ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } AttnError::InvalidSelectionProof { .. } | AttnError::InvalidSignature => { /* @@ -537,12 +509,8 @@ impl Worker { * * The peer has published an invalid consensus message. */ - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Reject, - ); - self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); } AttnError::EmptyAggregationBitfield => { /* @@ -552,12 +520,8 @@ impl Worker { * violation of the spec nor indication of fault. * */ - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Reject, - ); - self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); } AttnError::AggregatorPubkeyUnknown(_) => { /* @@ -573,12 +537,8 @@ impl Worker { * * The peer has published an invalid consensus message. */ - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Reject, - ); - self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); } AttnError::AggregatorNotInCommittee { .. } => { /* @@ -594,12 +554,8 @@ impl Worker { * * The peer has published an invalid consensus message. */ - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Reject, - ); - self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); } AttnError::AttestationAlreadyKnown { .. } => { /* @@ -632,10 +588,7 @@ impl Worker { "block" => %beacon_block_root, "type" => ?attestation_type, ); - // We still penalize the peer slightly. We don't want this to be a recurring - // behaviour. - self.gossip_penalize_peer(peer_id.clone(), PeerAction::HighToleranceError); - + // This is an allowed behaviour. self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); return; @@ -646,7 +599,7 @@ impl Worker { * * The peer is not necessarily faulty. */ - trace!( + debug!( self.log, "Prior attestation known"; "peer_id" => %peer_id, @@ -655,7 +608,7 @@ impl Worker { ); // We still penalize the peer slightly. We don't want this to be a recurring // behaviour. - self.gossip_penalize_peer(peer_id.clone(), PeerAction::HighToleranceError); + self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError); self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); @@ -668,12 +621,15 @@ impl Worker { * * The peer has published an invalid consensus message. */ - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Reject, + debug!( + self.log, + "Validation Index too high"; + "peer_id" => %peer_id, + "block" => %beacon_block_root, + "type" => ?attestation_type, ); - self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); } AttnError::UnknownHeadBlock { beacon_block_root } => { // Note: its a little bit unclear as to whether or not this block is unknown or @@ -691,10 +647,7 @@ impl Worker { ); // we don't know the block, get the sync manager to handle the block lookup self.sync_tx - .send(SyncMessage::UnknownBlockHash( - peer_id.clone(), - *beacon_block_root, - )) + .send(SyncMessage::UnknownBlockHash(peer_id, *beacon_block_root)) .unwrap_or_else(|_| { warn!( self.log, @@ -722,12 +675,8 @@ impl Worker { * * The peer has published an invalid consensus message. */ - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Reject, - ); - self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); } AttnError::BadTargetEpoch => { /* @@ -736,12 +685,8 @@ impl Worker { * * The peer has published an invalid consensus message. */ - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Reject, - ); - self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); } AttnError::NoCommitteeForSlotAndIndex { .. } => { /* @@ -749,12 +694,8 @@ impl Worker { * * The peer has published an invalid consensus message. */ - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Reject, - ); - self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); } AttnError::NotExactlyOneAggregationBitSet(_) => { /* @@ -762,12 +703,8 @@ impl Worker { * * The peer has published an invalid consensus message. */ - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Reject, - ); - self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); } AttnError::AttestsToFutureBlock { .. } => { /* @@ -775,12 +712,8 @@ impl Worker { * * The peer has published an invalid consensus message. */ - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Reject, - ); - self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); } AttnError::InvalidSubnetId { received, expected } => { @@ -793,12 +726,8 @@ impl Worker { "expected" => ?expected, "received" => ?received, ); - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Reject, - ); - self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); } AttnError::Invalid(_) => { /* @@ -806,12 +735,8 @@ impl Worker { * * The peer has published an invalid consensus message. */ - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Reject, - ); - self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); } AttnError::InvalidTargetEpoch { .. } => { /* @@ -819,12 +744,8 @@ impl Worker { * * The peer has published an invalid consensus message. */ - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Reject, - ); - self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); } AttnError::InvalidTargetRoot { .. } => { /* @@ -832,12 +753,8 @@ impl Worker { * * The peer has published an invalid consensus message. */ - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Reject, - ); - self.gossip_penalize_peer(peer_id.clone(), PeerAction::LowToleranceError); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer(peer_id, PeerAction::LowToleranceError); } AttnError::TooManySkippedSlots { head_block_slot, @@ -856,12 +773,8 @@ impl Worker { ); // In this case we wish to penalize gossipsub peers that do this to avoid future // attestations that have too many skip slots. - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Reject, - ); - self.gossip_penalize_peer(peer_id.clone(), PeerAction::MidToleranceError); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); + self.gossip_penalize_peer(peer_id, PeerAction::MidToleranceError); } AttnError::BeaconChainError(e) => { /* @@ -877,13 +790,9 @@ impl Worker { "peer_id" => %peer_id, "error" => ?e, ); - self.propagate_validation_result( - message_id, - peer_id.clone(), - MessageAcceptance::Ignore, - ); + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); // Penalize the peer slightly - self.gossip_penalize_peer(peer_id.clone(), PeerAction::HighToleranceError); + self.gossip_penalize_peer(peer_id, PeerAction::HighToleranceError); } } diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index f0c8e85f7..71b9b1139 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -116,7 +116,7 @@ impl Worker { for root in request.block_roots.iter() { if let Ok(Some(block)) = self.chain.store.get_block(root) { self.send_response( - peer_id.clone(), + peer_id, Response::BlocksByRoot(Some(Box::new(block))), request_id, ); @@ -212,7 +212,7 @@ impl Worker { { blocks_sent += 1; self.send_network_message(NetworkMessage::SendResponse { - peer_id: peer_id.clone(), + peer_id, response: Response::BlocksByRange(Some(Box::new(block))), id: request_id, }); diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 3155aeb08..677e8d036 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -126,11 +126,8 @@ impl Processor { // ignore status responses if we are shutting down if let Ok(status_message) = status_message(&self.chain) { // Say status back. - self.network.send_response( - peer_id.clone(), - Response::Status(status_message), - request_id, - ); + self.network + .send_response(peer_id, Response::Status(status_message), request_id); } self.send_beacon_processor_work(BeaconWorkEvent::status_message(peer_id, status)) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 581dfe4c4..88e435422 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -713,7 +713,7 @@ impl SyncManager { // The sent block is not the correct block, remove the head block and downvote // the peer let _ = parent_request.downloaded_blocks.pop(); - let peer = parent_request.last_submitted_peer.clone(); + let peer = parent_request.last_submitted_peer; warn!(self.log, "Peer sent invalid parent."; "peer_id" => %peer, @@ -759,7 +759,7 @@ impl SyncManager { } Ok(_) | Err(BlockError::BlockIsAlreadyKnown { .. }) => { let process_id = ProcessId::ParentLookup( - parent_request.last_submitted_peer.clone(), + parent_request.last_submitted_peer, chain_block_hash, ); let blocks = parent_request.downloaded_blocks; @@ -852,7 +852,7 @@ impl SyncManager { // We continue to search for the chain of blocks from the same peer. Other peers are not // guaranteed to have this chain of blocks. - let peer_id = parent_request.last_submitted_peer.clone(); + let peer_id = parent_request.last_submitted_peer; if let Ok(request_id) = self.network.blocks_by_root_request(peer_id, request) { // if the request was successful add the queue back into self diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index c43d4325f..731009032 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -102,11 +102,11 @@ impl BatchInfo { ); for attempt in &self.failed_processing_attempts { - peers.insert(attempt.peer_id.clone()); + peers.insert(attempt.peer_id); } for download in &self.failed_download_attempts { - peers.insert(download.clone()); + peers.insert(*download); } peers diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index fbb222e2f..ccf18d2e5 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -606,7 +606,7 @@ impl SyncingChain { "batch_epoch" => id, "score_adjustment" => %action, "original_peer" => %attempt.peer_id, "new_peer" => %processed_attempt.peer_id ); - network.report_peer(attempt.peer_id.clone(), action); + network.report_peer(attempt.peer_id, action); } else { // The same peer corrected it's previous mistake. There was an error, so we // negative score the original peer. @@ -615,7 +615,7 @@ impl SyncingChain { "batch_epoch" => id, "score_adjustment" => %action, "original_peer" => %attempt.peer_id, "new_peer" => %processed_attempt.peer_id ); - network.report_peer(attempt.peer_id.clone(), action); + network.report_peer(attempt.peer_id, action); } } } @@ -822,11 +822,11 @@ impl SyncingChain { let mut priorized_peers = self .peers .iter() - .map(|(peer, requests)| (failed_peers.contains(peer), requests.len(), peer)) + .map(|(peer, requests)| (failed_peers.contains(peer), requests.len(), *peer)) .collect::>(); // Sort peers prioritizing unrelated peers with less active requests. priorized_peers.sort_unstable(); - priorized_peers.get(0).map(|&(_, _, peer)| peer.clone()) + priorized_peers.get(0).map(|&(_, _, peer)| peer) }; if let Some(peer) = new_peer { @@ -846,10 +846,10 @@ impl SyncingChain { ) -> ProcessingResult { if let Some(batch) = self.batches.get_mut(&batch_id) { let request = batch.to_blocks_by_range_request(); - match network.blocks_by_range_request(peer.clone(), request, self.id, batch_id) { + match network.blocks_by_range_request(peer, request, self.id, batch_id) { Ok(request_id) => { // inform the batch about the new request - batch.start_downloading_from_peer(peer.clone(), request_id)?; + batch.start_downloading_from_peer(peer, request_id)?; if self .optimistic_start .map(|epoch| epoch == batch_id) @@ -879,7 +879,7 @@ impl SyncingChain { warn!(self.log, "Could not send batch request"; "batch_id" => batch_id, "error" => e, &batch); // register the failed download and check if the batch can be retried - batch.start_downloading_from_peer(peer.clone(), 1)?; // fake request_id is not relevant + batch.start_downloading_from_peer(peer, 1)?; // fake request_id is not relevant self.peers .get_mut(&peer) .map(|request| request.remove(&batch_id)); @@ -922,7 +922,7 @@ impl SyncingChain { .iter() .filter_map(|(peer, requests)| { if requests.is_empty() { - Some(peer.clone()) + Some(*peer) } else { None }