From bcdd960ab1a945ebc658eb05cbe2174a3c133ba8 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 12 May 2022 00:42:17 +0000 Subject: [PATCH] Separate execution payloads in the DB (#3157) ## Proposed Changes Reduce post-merge disk usage by not storing finalized execution payloads in Lighthouse's database. :warning: **This is achieved in a backwards-incompatible way for networks that have already merged** :warning:. Kiln users and shadow fork enjoyers will be unable to downgrade after running the code from this PR. The upgrade migration may take several minutes to run, and can't be aborted after it begins. The main changes are: - New column in the database called `ExecPayload`, keyed by beacon block root. - The `BeaconBlock` column now stores blinded blocks only. - Lots of places that previously used full blocks now use blinded blocks, e.g. analytics APIs, block replay in the DB, etc. - On finalization: - `prune_abanonded_forks` deletes non-canonical payloads whilst deleting non-canonical blocks. - `migrate_db` deletes finalized canonical payloads whilst deleting finalized states. - Conversions between blinded and full blocks are implemented in a compositional way, duplicating some work from Sean's PR #3134. - The execution layer has a new `get_payload_by_block_hash` method that reconstructs a payload using the EE's `eth_getBlockByHash` call. - I've tested manually that it works on Kiln, using Geth and Nethermind. - This isn't necessarily the most efficient method, and new engine APIs are being discussed to improve this: https://github.com/ethereum/execution-apis/pull/146. - We're depending on the `ethers` master branch, due to lots of recent changes. We're also using a workaround for https://github.com/gakonst/ethers-rs/issues/1134. - Payload reconstruction is used in the HTTP API via `BeaconChain::get_block`, which is now `async`. Due to the `async` fn, the `blocking_json` wrapper has been removed. - Payload reconstruction is used in network RPC to serve blocks-by-{root,range} responses. Here the `async` adjustment is messier, although I think I've managed to come up with a reasonable compromise: the handlers take the `SendOnDrop` by value so that they can drop it on _task completion_ (after the `fn` returns). Still, this is introducing disk reads onto core executor threads, which may have a negative performance impact (thoughts appreciated). ## Additional Info - [x] For performance it would be great to remove the cloning of full blocks when converting them to blinded blocks to write to disk. I'm going to experiment with a `put_block` API that takes the block by value, breaks it into a blinded block and a payload, stores the blinded block, and then re-assembles the full block for the caller. - [x] We should measure the latency of blocks-by-root and blocks-by-range responses. - [x] We should add integration tests that stress the payload reconstruction (basic tests done, issue for more extensive tests: https://github.com/sigp/lighthouse/issues/3159) - [x] We should (manually) test the schema v9 migration from several prior versions, particularly as blocks have changed on disk and some migrations rely on being able to load blocks. Co-authored-by: Paul Hauner --- Cargo.lock | 407 ++++++++++++++++-- beacon_node/beacon_chain/Cargo.toml | 3 +- beacon_node/beacon_chain/src/beacon_chain.rs | 106 ++++- .../src/beacon_fork_choice_store.rs | 9 +- .../beacon_chain/src/beacon_snapshot.rs | 15 +- beacon_node/beacon_chain/src/block_reward.rs | 6 +- .../beacon_chain/src/block_verification.rs | 10 +- beacon_node/beacon_chain/src/builder.rs | 8 +- beacon_node/beacon_chain/src/errors.rs | 12 + .../beacon_chain/src/execution_payload.rs | 2 +- beacon_node/beacon_chain/src/fork_revert.rs | 12 +- .../beacon_chain/src/historical_blocks.rs | 9 +- beacon_node/beacon_chain/src/migrate.rs | 11 +- .../src/pre_finalization_cache.rs | 2 +- beacon_node/beacon_chain/src/schema_change.rs | 14 +- .../src/schema_change/migration_schema_v7.rs | 2 +- .../src/schema_change/migration_schema_v8.rs | 2 +- .../src/schema_change/migration_schema_v9.rs | 176 ++++++++ .../beacon_chain/src/snapshot_cache.rs | 12 +- beacon_node/beacon_chain/src/test_utils.rs | 7 +- .../tests/attestation_production.rs | 8 +- .../tests/attestation_verification.rs | 2 +- .../beacon_chain/tests/block_verification.rs | 12 + .../tests/payload_invalidation.rs | 47 +- beacon_node/beacon_chain/tests/store_tests.rs | 47 +- beacon_node/beacon_chain/tests/tests.rs | 12 +- beacon_node/execution_layer/Cargo.toml | 1 + beacon_node/execution_layer/src/engine_api.rs | 39 +- .../execution_layer/src/engine_api/http.rs | 11 +- beacon_node/execution_layer/src/lib.rs | 58 +++ beacon_node/execution_layer/src/metrics.rs | 4 + .../http_api/src/attestation_performance.rs | 10 +- beacon_node/http_api/src/block_id.rs | 54 ++- .../http_api/src/block_packing_efficiency.rs | 14 +- beacon_node/http_api/src/database.rs | 6 +- beacon_node/http_api/src/lib.rs | 25 +- beacon_node/http_api/tests/tests.rs | 15 +- beacon_node/lighthouse_network/Cargo.toml | 2 +- .../network/src/beacon_processor/mod.rs | 21 +- .../beacon_processor/worker/rpc_methods.rs | 200 ++++++--- .../beacon_processor/worker/sync_methods.rs | 7 +- beacon_node/store/src/errors.rs | 4 + beacon_node/store/src/hot_cold_store.rs | 289 +++++++++---- beacon_node/store/src/impls.rs | 1 + .../store/src/impls/execution_payload.rs | 17 + beacon_node/store/src/iter.rs | 20 +- beacon_node/store/src/leveldb_store.rs | 22 + beacon_node/store/src/lib.rs | 19 +- beacon_node/store/src/metadata.rs | 2 +- beacon_node/store/src/reconstruct.rs | 2 +- consensus/fork_choice/src/fork_choice.rs | 4 +- .../fork_choice/src/fork_choice_store.rs | 6 +- consensus/fork_choice/tests/tests.rs | 2 +- .../state_processing/src/block_replayer.rs | 12 +- consensus/types/Cargo.toml | 2 +- consensus/types/src/beacon_block.rs | 122 +++++- consensus/types/src/beacon_block_body.rs | 192 +++++++++ consensus/types/src/lib.rs | 2 +- consensus/types/src/payload.rs | 30 +- consensus/types/src/signed_beacon_block.rs | 203 ++++++++- testing/ef_tests/src/cases/fork_choice.rs | 6 +- .../src/test_rig.rs | 24 +- 62 files changed, 2009 insertions(+), 392 deletions(-) create mode 100644 beacon_node/beacon_chain/src/schema_change/migration_schema_v9.rs create mode 100644 beacon_node/store/src/impls/execution_payload.rs diff --git a/Cargo.lock b/Cargo.lock index dc4044c43..f8016d994 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -263,17 +263,29 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base16ct" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" + [[package]] name = "base64" version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "base64ct" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dea908e7347a8c64e378c17e30ef880ad73e3b4498346b055c2c00ea342f3179" + [[package]] name = "beacon_chain" version = "0.2.0" dependencies = [ - "bitvec", + "bitvec 0.20.4", "bls", "derivative", "environment", @@ -287,6 +299,7 @@ dependencies = [ "fork_choice", "futures", "genesis", + "hex", "int_to_bytes", "itertools", "lazy_static", @@ -394,10 +407,22 @@ version = "0.20.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7774144344a4faa177370406a7ff5f1da24303817368584c6206c8303eb07848" dependencies = [ - "funty", - "radium", + "funty 1.1.0", + "radium 0.6.2", "tap", - "wyz", + "wyz 0.2.0", +] + +[[package]] +name = "bitvec" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1489fcb93a5bb47da0462ca93ad252ad6af2145cce58d10d46a83931ba9f016b" +dependencies = [ + "funty 2.0.0", + "radium 0.7.0", + "tap", + "wyz 0.5.0", ] [[package]] @@ -443,7 +468,7 @@ dependencies = [ "eth2_hashing", "eth2_serde_utils", "eth2_ssz", - "ethereum-types", + "ethereum-types 0.12.1", "hex", "milagro_bls", "rand 0.7.3", @@ -542,6 +567,9 @@ name = "bytes" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +dependencies = [ + "serde", +] [[package]] name = "bzip2" @@ -572,7 +600,7 @@ dependencies = [ "eth2_ssz", "eth2_ssz_derive", "eth2_ssz_types", - "ethereum-types", + "ethereum-types 0.12.1", "quickcheck 0.9.2", "quickcheck_macros", "smallvec", @@ -690,7 +718,7 @@ dependencies = [ "dirs", "eth2_network_config", "eth2_ssz", - "ethereum-types", + "ethereum-types 0.12.1", "hex", "serde", "serde_json", @@ -776,6 +804,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "279bc8fc53f788a75c7804af68237d1fce02cde1e275a886a4b320604dc2aeda" +[[package]] +name = "const-oid" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3" + [[package]] name = "convert_case" version = "0.4.0" @@ -912,6 +946,18 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" +[[package]] +name = "crypto-bigint" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03c6a1d5fa1de37e071642dfa44ec552ca5b299adb128fab16138e24b548fd21" +dependencies = [ + "generic-array", + "rand_core 0.6.3", + "subtle", + "zeroize", +] + [[package]] name = "crypto-common" version = "0.1.3" @@ -1099,7 +1145,7 @@ name = "deposit_contract" version = "0.2.0" dependencies = [ "eth2_ssz", - "ethabi", + "ethabi 16.0.0", "hex", "reqwest", "serde_json", @@ -1114,10 +1160,19 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2eeb9d92785d1facb50567852ce75d0858630630e7eabea59cf7eb7474051087" dependencies = [ - "const-oid", + "const-oid 0.5.2", "typenum", ] +[[package]] +name = "der" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6919815d73839e7ad218de758883aae3a257ba6759ce7a9992501efbb53d705c" +dependencies = [ + "const-oid 0.7.1", +] + [[package]] name = "derivative" version = "2.2.0" @@ -1268,12 +1323,24 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34d33b390ab82f2e1481e331dbd0530895640179d2128ef9a79cc690b78d1eba" dependencies = [ - "der", - "elliptic-curve", + "der 0.3.5", + "elliptic-curve 0.9.12", "hmac 0.11.0", "signature", ] +[[package]] +name = "ecdsa" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0d69ae62e0ce582d56380743515fefaf1a8c70cec685d9677636d7e30ae9dc9" +dependencies = [ + "der 0.5.1", + "elliptic-curve 0.11.12", + "rfc6979", + "signature", +] + [[package]] name = "ed25519" version = "1.4.1" @@ -1309,7 +1376,7 @@ dependencies = [ "derivative", "eth2_ssz", "eth2_ssz_derive", - "ethereum-types", + "ethereum-types 0.12.1", "fork_choice", "fs2", "hex", @@ -1339,16 +1406,34 @@ version = "0.9.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c13e9b0c3c4170dcc2a12783746c4205d98e18957f57854251eea3f9750fe005" dependencies = [ - "bitvec", - "ff", + "bitvec 0.20.4", + "ff 0.9.0", "generic-array", - "group", - "pkcs8", + "group 0.9.0", + "pkcs8 0.6.1", "rand_core 0.6.3", "subtle", "zeroize", ] +[[package]] +name = "elliptic-curve" +version = "0.11.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25b477563c2bfed38a3b7a60964c49e058b2510ad3f12ba3483fd8f62c2306d6" +dependencies = [ + "base16ct", + "crypto-bigint", + "der 0.5.1", + "ff 0.11.0", + "generic-array", + "group 0.11.0", + "rand_core 0.6.3", + "sec1", + "subtle", + "zeroize", +] + [[package]] name = "encoding_rs" version = "0.8.30" @@ -1369,12 +1454,12 @@ dependencies = [ "bytes", "ed25519-dalek", "hex", - "k256", + "k256 0.8.1", "log", "rand 0.8.5", "rlp", "serde", - "sha3", + "sha3 0.9.1", "zeroize", ] @@ -1599,7 +1684,7 @@ dependencies = [ name = "eth2_serde_utils" version = "0.1.1" dependencies = [ - "ethereum-types", + "ethereum-types 0.12.1", "hex", "serde", "serde_derive", @@ -1611,7 +1696,7 @@ name = "eth2_ssz" version = "0.4.1" dependencies = [ "eth2_ssz_derive", - "ethereum-types", + "ethereum-types 0.12.1", "smallvec", ] @@ -1673,11 +1758,28 @@ version = "16.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4c98847055d934070b90e806e12d3936b787d0a115068981c1d8dfd5dfef5a5" dependencies = [ - "ethereum-types", + "ethereum-types 0.12.1", "hex", "serde", "serde_json", - "sha3", + "sha3 0.9.1", + "thiserror", + "uint", +] + +[[package]] +name = "ethabi" +version = "17.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b69517146dfab88e9238c00c724fd8e277951c3cc6f22b016d72f422a832213e" +dependencies = [ + "ethereum-types 0.13.1", + "hex", + "once_cell", + "regex", + "serde", + "serde_json", + "sha3 0.10.1", "thiserror", "uint", ] @@ -1695,20 +1797,68 @@ dependencies = [ "tiny-keccak", ] +[[package]] +name = "ethbloom" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11da94e443c60508eb62cf256243a64da87304c2802ac2528847f79d750007ef" +dependencies = [ + "crunchy", + "fixed-hash", + "impl-rlp", + "impl-serde", + "tiny-keccak", +] + [[package]] name = "ethereum-types" version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05136f7057fe789f06e6d41d07b34e6f70d8c86e5693b60f97aaa6553553bdaf" dependencies = [ - "ethbloom", + "ethbloom 0.11.1", "fixed-hash", "impl-rlp", "impl-serde", - "primitive-types", + "primitive-types 0.10.1", "uint", ] +[[package]] +name = "ethereum-types" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2827b94c556145446fcce834ca86b7abf0c39a805883fe20e72c5bfdb5a0dc6" +dependencies = [ + "ethbloom 0.12.1", + "fixed-hash", + "impl-rlp", + "impl-serde", + "primitive-types 0.11.1", + "uint", +] + +[[package]] +name = "ethers-core" +version = "0.6.0" +source = "git+https://github.com/gakonst/ethers-rs?rev=02ad93a1cfb7b62eb051c77c61dc4c0218428e4a#02ad93a1cfb7b62eb051c77c61dc4c0218428e4a" +dependencies = [ + "arrayvec", + "bytes", + "elliptic-curve 0.11.12", + "ethabi 17.0.0", + "generic-array", + "hex", + "k256 0.10.4", + "rand 0.8.5", + "rlp", + "rlp-derive", + "serde", + "serde_json", + "thiserror", + "tiny-keccak", +] + [[package]] name = "execution_engine_integration" version = "0.1.0" @@ -1736,6 +1886,7 @@ dependencies = [ "eth1", "eth2_serde_utils", "eth2_ssz_types", + "ethers-core", "exit-future", "futures", "hex", @@ -1804,7 +1955,17 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72a4d941a5b7c2a75222e2d44fcdf634a67133d9db31e177ae5ff6ecda852bfe" dependencies = [ - "bitvec", + "bitvec 0.20.4", + "rand_core 0.6.3", + "subtle", +] + +[[package]] +name = "ff" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2958d04124b9f27f175eaeb9a9f383d026098aa837eadd8ba22c11f13a05b9e" +dependencies = [ "rand_core 0.6.3", "subtle", ] @@ -1923,6 +2084,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7" +[[package]] +name = "funty" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" + [[package]] name = "futures" version = "0.3.21" @@ -2135,7 +2302,18 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61b3c1e8b4f1ca07e6605ea1be903a5f6956aec5c8a67fd44d56076631675ed8" dependencies = [ - "ff", + "ff 0.9.0", + "rand_core 0.6.3", + "subtle", +] + +[[package]] +name = "group" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc5ac374b108929de78460075f3dc439fa66df9d8fc77e8f12caa5165fcf0c89" +dependencies = [ + "ff 0.11.0", "rand_core 0.6.3", "subtle", ] @@ -2504,7 +2682,16 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "161ebdfec3c8e3b52bf61c4f3550a1eea4f9579d10dc1b936f3171ebdcd6c443" dependencies = [ - "parity-scale-codec", + "parity-scale-codec 2.3.1", +] + +[[package]] +name = "impl-codec" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba6a270039626615617f3f36d15fc827041df3b78c439da2cadfa47455a77f2f" +dependencies = [ + "parity-scale-codec 3.1.2", ] [[package]] @@ -2657,11 +2844,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3e8e491ed22bc161583a1c77e42313672c483eba6bd9d7afec0f1131d0b9ce" dependencies = [ "cfg-if", - "ecdsa", - "elliptic-curve", + "ecdsa 0.11.1", + "elliptic-curve 0.9.12", "sha2 0.9.9", ] +[[package]] +name = "k256" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19c3a5e0a0b8450278feda242592512e09f61c72e018b8cd5c859482802daf2d" +dependencies = [ + "cfg-if", + "ecdsa 0.13.4", + "elliptic-curve 0.11.12", + "sec1", + "sha3 0.9.1", +] + [[package]] name = "keccak" version = "0.1.0" @@ -3425,7 +3625,7 @@ name = "merkle_proof" version = "0.2.0" dependencies = [ "eth2_hashing", - "ethereum-types", + "ethereum-types 0.12.1", "lazy_static", "quickcheck 0.9.2", "quickcheck_macros", @@ -3964,10 +4164,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "373b1a4c1338d9cd3d1fa53b3a11bdab5ab6bd80a20f7f7becd76953ae2be909" dependencies = [ "arrayvec", - "bitvec", + "bitvec 0.20.4", "byte-slice-cast", "impl-trait-for-tuples", - "parity-scale-codec-derive", + "parity-scale-codec-derive 2.3.1", + "serde", +] + +[[package]] +name = "parity-scale-codec" +version = "3.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8b44461635bbb1a0300f100a841e571e7d919c81c73075ef5d152ffdb521066" +dependencies = [ + "arrayvec", + "bitvec 1.0.0", + "byte-slice-cast", + "impl-trait-for-tuples", + "parity-scale-codec-derive 3.1.2", "serde", ] @@ -3983,6 +4197,18 @@ dependencies = [ "syn", ] +[[package]] +name = "parity-scale-codec-derive" +version = "3.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c45ed1f39709f5a89338fab50e59816b2e8815f5bb58276e7ddf9afd495f73f8" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "parking_lot" version = "0.11.2" @@ -4159,8 +4385,19 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c9c2f795bc591cb3384cb64082a578b89207ac92bb89c9d98c1ea2ace7cd8110" dependencies = [ - "der", - "spki", + "der 0.3.5", + "spki 0.3.0", +] + +[[package]] +name = "pkcs8" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cabda3fb821068a9a4fab19a683eac3af12edf0f34b94a8be53c4972b8149d0" +dependencies = [ + "der 0.5.1", + "spki 0.5.4", + "zeroize", ] [[package]] @@ -4239,7 +4476,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05e4722c697a58a99d5d06a08c30821d7c082a4632198de1eaa5a6c22ef42373" dependencies = [ "fixed-hash", - "impl-codec", + "impl-codec 0.5.1", + "impl-rlp", + "impl-serde", + "uint", +] + +[[package]] +name = "primitive-types" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e28720988bff275df1f51b171e1b2a18c30d194c4d2b61defdacecd625a5d94a" +dependencies = [ + "fixed-hash", + "impl-codec 0.6.0", "impl-rlp", "impl-serde", "uint", @@ -4519,6 +4769,12 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "643f8f41a8ebc4c5dc4515c82bb8abd397b527fc20fd681b7c011c2aee5d44fb" +[[package]] +name = "radium" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" + [[package]] name = "rand" version = "0.7.3" @@ -4726,6 +4982,17 @@ dependencies = [ "quick-error", ] +[[package]] +name = "rfc6979" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96ef608575f6392792f9ecf7890c00086591d29a83910939d430753f7c050525" +dependencies = [ + "crypto-bigint", + "hmac 0.11.0", + "zeroize", +] + [[package]] name = "ring" version = "0.16.20" @@ -4757,6 +5024,17 @@ dependencies = [ "rustc-hex", ] +[[package]] +name = "rlp-derive" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e33d7b2abe0c340d8797fe2907d3f20d3b5ea5908683618bfe80df7f621f672a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "rpassword" version = "5.0.1" @@ -4966,6 +5244,19 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sec1" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08da66b8b0965a5555b6bd6639e68ccba85e1e2506f5fbb089e93f8a04e1a2d1" +dependencies = [ + "der 0.5.1", + "generic-array", + "pkcs8 0.8.0", + "subtle", + "zeroize", +] + [[package]] name = "secp256k1" version = "0.21.3" @@ -5200,6 +5491,16 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "sha3" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "881bf8156c87b6301fc5ca6b27f11eeb2761224c7081e69b409d5a1951a70c86" +dependencies = [ + "digest 0.10.3", + "keccak", +] + [[package]] name = "sharded-slab" version = "0.1.4" @@ -5522,7 +5823,17 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9dae7e047abc519c96350e9484a96c6bf1492348af912fd3446dd2dc323f6268" dependencies = [ - "der", + "der 0.3.5", +] + +[[package]] +name = "spki" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d01ac02a6ccf3e07db148d2be087da624fea0221a16152ed01f0496a6b0a27" +dependencies = [ + "base64ct", + "der 0.5.1", ] [[package]] @@ -5639,14 +5950,15 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "superstruct" -version = "0.4.1" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e623e69a04a6352677c1f892027e14e034dfc6c4aabed0a4a0be9c1a0a46cee" +checksum = "95a99807a055ff4ff5d249bb84c80d9eabb55ca3c452187daae43fd5b51ef695" dependencies = [ "darling", "itertools", "proc-macro2", "quote", + "smallvec", "syn", ] @@ -5656,7 +5968,7 @@ version = "0.2.0" dependencies = [ "criterion", "eth2_hashing", - "ethereum-types", + "ethereum-types 0.12.1", ] [[package]] @@ -6131,7 +6443,7 @@ dependencies = [ "eth2_hashing", "eth2_ssz", "eth2_ssz_derive", - "ethereum-types", + "ethereum-types 0.12.1", "rand 0.8.5", "smallvec", "tree_hash_derive", @@ -6250,7 +6562,7 @@ dependencies = [ "eth2_ssz", "eth2_ssz_derive", "eth2_ssz_types", - "ethereum-types", + "ethereum-types 0.12.1", "hex", "int_to_bytes", "itertools", @@ -6724,8 +7036,8 @@ dependencies = [ "base64", "bytes", "derive_more", - "ethabi", - "ethereum-types", + "ethabi 16.0.0", + "ethereum-types 0.12.1", "futures", "futures-timer", "headers", @@ -6942,6 +7254,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214" +[[package]] +name = "wyz" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b31594f29d27036c383b53b59ed3476874d518f0efb151b27a4c275141390e" +dependencies = [ + "tap", +] + [[package]] name = "x25519-dalek" version = "1.1.1" diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 0f632c4de..022b85fa7 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -59,7 +59,8 @@ strum = { version = "0.24.0", features = ["derive"] } logging = { path = "../../common/logging" } execution_layer = { path = "../execution_layer" } sensitive_url = { path = "../../common/sensitive_url" } -superstruct = "0.4.1" +superstruct = "0.5.0" +hex = "0.4.2" [[test]] name = "beacon_chain_tests" diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 18ddd8b7a..60670c849 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -83,8 +83,11 @@ use std::marker::PhantomData; use std::sync::Arc; use std::time::{Duration, Instant}; use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator}; -use store::{Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp}; +use store::{ + DatabaseBlock, Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp, +}; use task_executor::ShutdownReason; +use tree_hash::TreeHash; use types::beacon_state::CloneConfig; use types::*; @@ -587,7 +590,7 @@ impl BeaconChain { block_root: Hash256, ) -> Result> + '_, Error> { let block = self - .get_block(&block_root)? + .get_blinded_block(&block_root)? .ok_or(Error::MissingBeaconBlock(block_root))?; let state = self .get_state(&block.state_root(), Some(block.slot()))? @@ -752,11 +755,11 @@ impl BeaconChain { &self, request_slot: Slot, skips: WhenSlotSkipped, - ) -> Result>, Error> { + ) -> Result>, Error> { let root = self.block_root_at_slot(request_slot, skips)?; if let Some(block_root) = root { - Ok(self.store.get_block(&block_root)?) + Ok(self.store.get_blinded_block(&block_root)?) } else { Ok(None) } @@ -961,16 +964,14 @@ impl BeaconChain { /// ## Errors /// /// May return a database error. - pub fn get_block_checking_early_attester_cache( + pub async fn get_block_checking_early_attester_cache( &self, block_root: &Hash256, ) -> Result>, Error> { - let block_opt = self - .store - .get_block(block_root)? - .or_else(|| self.early_attester_cache.get_block(*block_root)); - - Ok(block_opt) + if let Some(block) = self.early_attester_cache.get_block(*block_root) { + return Ok(Some(block)); + } + self.get_block(block_root).await } /// Returns the block at the given root, if any. @@ -978,11 +979,69 @@ impl BeaconChain { /// ## Errors /// /// May return a database error. - pub fn get_block( + pub async fn get_block( &self, block_root: &Hash256, ) -> Result>, Error> { - Ok(self.store.get_block(block_root)?) + // Load block from database, returning immediately if we have the full block w payload + // stored. + let blinded_block = match self.store.try_get_full_block(block_root)? { + Some(DatabaseBlock::Full(block)) => return Ok(Some(block)), + Some(DatabaseBlock::Blinded(block)) => block, + None => return Ok(None), + }; + + // If we only have a blinded block, load the execution payload from the EL. + let block_message = blinded_block.message(); + let execution_payload_header = &block_message + .execution_payload() + .map_err(|_| Error::BlockVariantLacksExecutionPayload(*block_root))? + .execution_payload_header; + + let exec_block_hash = execution_payload_header.block_hash; + + let execution_payload = self + .execution_layer + .as_ref() + .ok_or(Error::ExecutionLayerMissing)? + .get_payload_by_block_hash(exec_block_hash) + .await + .map_err(|e| Error::ExecutionLayerErrorPayloadReconstruction(exec_block_hash, e))? + .ok_or(Error::BlockHashMissingFromExecutionLayer(exec_block_hash))?; + + // Verify payload integrity. + let header_from_payload = ExecutionPayloadHeader::from(&execution_payload); + if header_from_payload != *execution_payload_header { + for txn in &execution_payload.transactions { + debug!( + self.log, + "Reconstructed txn"; + "bytes" => format!("0x{}", hex::encode(&**txn)), + ); + } + + return Err(Error::InconsistentPayloadReconstructed { + slot: blinded_block.slot(), + exec_block_hash, + canonical_payload_root: execution_payload_header.tree_hash_root(), + reconstructed_payload_root: header_from_payload.tree_hash_root(), + canonical_transactions_root: execution_payload_header.transactions_root, + reconstructed_transactions_root: header_from_payload.transactions_root, + }); + } + + // Add the payload to the block to form a full block. + blinded_block + .try_into_full_block(Some(execution_payload)) + .ok_or(Error::AddPayloadLogicError) + .map(Some) + } + + pub fn get_blinded_block( + &self, + block_root: &Hash256, + ) -> Result>, Error> { + Ok(self.store.get_blinded_block(block_root)?) } /// Returns the state at the given root, if any. @@ -3373,7 +3432,8 @@ impl BeaconChain { .map::, _>(Ok) .unwrap_or_else(|| { let beacon_block = self - .get_block(&beacon_block_root)? + .store + .get_full_block(&beacon_block_root)? .ok_or(Error::MissingBeaconBlock(beacon_block_root))?; let beacon_state_root = beacon_block.state_root(); @@ -4525,11 +4585,14 @@ impl BeaconChain { /// /// This could be a very expensive operation and should only be done in testing/analysis /// activities. - pub fn chain_dump(&self) -> Result>, Error> { + #[allow(clippy::type_complexity)] + pub fn chain_dump( + &self, + ) -> Result>>, Error> { let mut dump = vec![]; let mut last_slot = BeaconSnapshot { - beacon_block: self.head()?.beacon_block, + beacon_block: self.head()?.beacon_block.into(), beacon_block_root: self.head()?.beacon_block_root, beacon_state: self.head()?.beacon_state, }; @@ -4543,9 +4606,12 @@ impl BeaconChain { break; // Genesis has been reached. } - let beacon_block = self.store.get_block(&beacon_block_root)?.ok_or_else(|| { - Error::DBInconsistent(format!("Missing block {}", beacon_block_root)) - })?; + let beacon_block = self + .store + .get_blinded_block(&beacon_block_root)? + .ok_or_else(|| { + Error::DBInconsistent(format!("Missing block {}", beacon_block_root)) + })?; let beacon_state_root = beacon_block.state_root(); let beacon_state = self .store @@ -4630,7 +4696,7 @@ impl BeaconChain { visited.insert(block_hash); if signed_beacon_block.slot() % T::EthSpec::slots_per_epoch() == 0 { - let block = self.get_block(&block_hash).unwrap().unwrap(); + let block = self.get_blinded_block(&block_hash).unwrap().unwrap(); let state = self .get_state(&block.state_root(), Some(block.slot())) .unwrap() diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index 2e90203f2..dc80fb700 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -13,7 +13,8 @@ use std::sync::Arc; use store::{Error as StoreError, HotColdDB, ItemStore}; use superstruct::superstruct; use types::{ - BeaconBlock, BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec, Hash256, Slot, + BeaconBlock, BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec, ExecPayload, Hash256, + Slot, }; #[derive(Debug)] @@ -254,9 +255,9 @@ where self.time = slot } - fn on_verified_block( + fn on_verified_block>( &mut self, - _block: &BeaconBlock, + _block: &BeaconBlock, block_root: Hash256, state: &BeaconState, ) -> Result<(), Self::Error> { @@ -300,7 +301,7 @@ where metrics::inc_counter(&metrics::BALANCES_CACHE_MISSES); let justified_block = self .store - .get_block(&self.justified_checkpoint.root) + .get_blinded_block(&self.justified_checkpoint.root) .map_err(Error::FailedToReadBlock)? .ok_or(Error::MissingBlock(self.justified_checkpoint.root))? .deconstruct() diff --git a/beacon_node/beacon_chain/src/beacon_snapshot.rs b/beacon_node/beacon_chain/src/beacon_snapshot.rs index b9de6e9eb..94adb479c 100644 --- a/beacon_node/beacon_chain/src/beacon_snapshot.rs +++ b/beacon_node/beacon_chain/src/beacon_snapshot.rs @@ -1,19 +1,22 @@ use serde_derive::Serialize; -use types::{beacon_state::CloneConfig, BeaconState, EthSpec, Hash256, SignedBeaconBlock}; +use types::{ + beacon_state::CloneConfig, BeaconState, EthSpec, ExecPayload, FullPayload, Hash256, + SignedBeaconBlock, +}; /// Represents some block and its associated state. Generally, this will be used for tracking the /// head, justified head and finalized head. #[derive(Clone, Serialize, PartialEq, Debug)] -pub struct BeaconSnapshot { - pub beacon_block: SignedBeaconBlock, +pub struct BeaconSnapshot = FullPayload> { + pub beacon_block: SignedBeaconBlock, pub beacon_block_root: Hash256, pub beacon_state: BeaconState, } -impl BeaconSnapshot { +impl> BeaconSnapshot { /// Create a new checkpoint. pub fn new( - beacon_block: SignedBeaconBlock, + beacon_block: SignedBeaconBlock, beacon_block_root: Hash256, beacon_state: BeaconState, ) -> Self { @@ -36,7 +39,7 @@ impl BeaconSnapshot { /// Update all fields of the checkpoint. pub fn update( &mut self, - beacon_block: SignedBeaconBlock, + beacon_block: SignedBeaconBlock, beacon_block_root: Hash256, beacon_state: BeaconState, ) { diff --git a/beacon_node/beacon_chain/src/block_reward.rs b/beacon_node/beacon_chain/src/block_reward.rs index 83b204113..74a27d5f7 100644 --- a/beacon_node/beacon_chain/src/block_reward.rs +++ b/beacon_node/beacon_chain/src/block_reward.rs @@ -2,12 +2,12 @@ use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2::lighthouse::{AttestationRewards, BlockReward, BlockRewardMeta}; use operation_pool::{AttMaxCover, MaxCover}; use state_processing::per_block_processing::altair::sync_committee::compute_sync_aggregate_rewards; -use types::{BeaconBlockRef, BeaconState, EthSpec, Hash256, RelativeEpoch}; +use types::{BeaconBlockRef, BeaconState, EthSpec, ExecPayload, Hash256, RelativeEpoch}; impl BeaconChain { - pub fn compute_block_reward( + pub fn compute_block_reward>( &self, - block: BeaconBlockRef<'_, T::EthSpec>, + block: BeaconBlockRef<'_, T::EthSpec, Payload>, block_root: Hash256, state: &BeaconState, ) -> Result { diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 46ce5193b..70ec48cd1 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -78,9 +78,9 @@ use store::{Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, StoreOp use tree_hash::TreeHash; use types::ExecPayload; use types::{ - BeaconBlockRef, BeaconState, BeaconStateError, ChainSpec, CloneConfig, Epoch, EthSpec, - ExecutionBlockHash, Hash256, InconsistentFork, PublicKey, PublicKeyBytes, RelativeEpoch, - SignedBeaconBlock, SignedBeaconBlockHeader, Slot, + BeaconBlockRef, BeaconState, BeaconStateError, BlindedPayload, ChainSpec, CloneConfig, Epoch, + EthSpec, ExecutionBlockHash, Hash256, InconsistentFork, PublicKey, PublicKeyBytes, + RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; const POS_PANDA_BANNER: &str = r#" @@ -567,7 +567,7 @@ pub struct FullyVerifiedBlock<'a, T: BeaconChainTypes> { pub block: SignedBeaconBlock, pub block_root: Hash256, pub state: BeaconState, - pub parent_block: SignedBeaconBlock, + pub parent_block: SignedBeaconBlock>, pub confirmation_db_batch: Vec>, pub payload_verification_status: PayloadVerificationStatus, } @@ -1569,7 +1569,7 @@ fn load_parent( // indicate that we don't yet know the parent. let root = block.parent_root(); let parent_block = chain - .get_block(&block.parent_root()) + .get_blinded_block(&block.parent_root()) .map_err(BlockError::BeaconChainError)? .ok_or_else(|| { // Return a `MissingBeaconBlock` error instead of a `ParentUnknown` error since diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index e9860124c..98dcce9d2 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -240,7 +240,7 @@ where .ok_or("Fork choice not found in store")?; let genesis_block = store - .get_block(&chain.genesis_block_root) + .get_blinded_block(&chain.genesis_block_root) .map_err(|e| descriptive_db_error("genesis block", &e))? .ok_or("Genesis block not found in store")?; let genesis_state = store @@ -588,7 +588,7 @@ where // Try to decode the head block according to the current fork, if that fails, try // to backtrack to before the most recent fork. let (head_block_root, head_block, head_reverted) = - match store.get_block(&initial_head_block_root) { + match store.get_full_block(&initial_head_block_root) { Ok(Some(block)) => (initial_head_block_root, block, false), Ok(None) => return Err("Head block not found in store".into()), Err(StoreError::SszDecodeError(_)) => { @@ -986,10 +986,10 @@ mod test { assert_eq!( chain .store - .get_block(&Hash256::zero()) + .get_blinded_block(&Hash256::zero()) .expect("should read db") .expect("should find genesis block"), - block, + block.clone().into(), "should store genesis block under zero hash alias" ); assert_eq!( diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 90fd563b6..c036dfe45 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -139,6 +139,18 @@ pub enum BeaconChainError { }, AltairForkDisabled, ExecutionLayerMissing, + BlockVariantLacksExecutionPayload(Hash256), + ExecutionLayerErrorPayloadReconstruction(ExecutionBlockHash, execution_layer::Error), + BlockHashMissingFromExecutionLayer(ExecutionBlockHash), + InconsistentPayloadReconstructed { + slot: Slot, + exec_block_hash: ExecutionBlockHash, + canonical_payload_root: Hash256, + reconstructed_payload_root: Hash256, + canonical_transactions_root: Hash256, + reconstructed_transactions_root: Hash256, + }, + AddPayloadLogicError, ExecutionForkChoiceUpdateFailed(execution_layer::Error), PrepareProposerBlockingFailed(execution_layer::Error), ExecutionForkChoiceUpdateInvalid { diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index 2e18bf87b..08e4cd41e 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -333,7 +333,7 @@ pub async fn prepare_execution_payload, Cold: ItemStore ); let block_iter = ParentRootBlockIterator::fork_tolerant(&store, head_block_root); - process_results(block_iter, |mut iter| { + let (block_root, blinded_block) = process_results(block_iter, |mut iter| { iter.find_map(|(block_root, block)| { if block.slot() < fork_epoch.start_slot(E::slots_per_epoch()) { Some((block_root, block)) @@ -69,7 +69,13 @@ pub fn revert_to_fork_boundary, Cold: ItemStore e, CORRUPT_DB_MESSAGE ) })? - .ok_or_else(|| format!("No pre-fork blocks found. {}", CORRUPT_DB_MESSAGE)) + .ok_or_else(|| format!("No pre-fork blocks found. {}", CORRUPT_DB_MESSAGE))?; + + let block = store + .make_full_block(&block_root, blinded_block) + .map_err(|e| format!("Unable to add payload to new head block: {:?}", e))?; + + Ok((block_root, block)) } /// Reset fork choice to the finalized checkpoint of the supplied head state. @@ -97,7 +103,7 @@ pub fn reset_fork_choice_to_finalization, Cold: It let finalized_checkpoint = head_state.finalized_checkpoint(); let finalized_block_root = finalized_checkpoint.root; let finalized_block = store - .get_block(&finalized_block_root) + .get_full_block(&finalized_block_root) .map_err(|e| format!("Error loading finalized block: {:?}", e))? .ok_or_else(|| { format!( diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 234e6c64e..1891362eb 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -9,7 +9,7 @@ use std::borrow::Cow; use std::iter; use std::time::Duration; use store::{chunked_vector::BlockRoots, AnchorInfo, ChunkWriter, KeyValueStore}; -use types::{Hash256, SignedBeaconBlock, Slot}; +use types::{Hash256, SignedBlindedBeaconBlock, Slot}; /// Use a longer timeout on the pubkey cache. /// @@ -58,7 +58,7 @@ impl BeaconChain { /// Return the number of blocks successfully imported. pub fn import_historical_block_batch( &self, - blocks: &[SignedBeaconBlock], + blocks: Vec>, ) -> Result { let anchor_info = self .store @@ -106,8 +106,9 @@ impl BeaconChain { .into()); } - // Store block in the hot database. - hot_batch.push(self.store.block_as_kv_store_op(&block_root, block)); + // Store block in the hot database without payload. + self.store + .blinded_block_as_kv_store_ops(&block_root, block, &mut hot_batch); // Store block roots, including at all skip slots in the freezer DB. for slot in (block.slot().as_usize()..prev_block_slot.as_usize()).rev() { diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 5ae762732..2c2ce0aa1 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -391,7 +391,7 @@ impl, Cold: ItemStore> BackgroundMigrator block.state_root(), Ok(None) => { return Err(BeaconStateError::MissingBeaconBlock(head_hash.into()).into()) @@ -535,7 +535,12 @@ impl, Cold: ItemStore> BackgroundMigrator> = abandoned_blocks .into_iter() .map(Into::into) - .map(StoreOp::DeleteBlock) + .flat_map(|block_root: Hash256| { + [ + StoreOp::DeleteBlock(block_root), + StoreOp::DeleteExecutionPayload(block_root), + ] + }) .chain( abandoned_states .into_iter() @@ -543,7 +548,7 @@ impl, Cold: ItemStore> BackgroundMigrator BeaconChain { } // 2. Check on disk. - if self.store.get_block(&block_root)?.is_some() { + if self.store.get_blinded_block(&block_root)?.is_some() { cache.block_roots.put(block_root, ()); return Ok(true); } diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index 6d797ab37..83e0cdd11 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -2,6 +2,7 @@ mod migration_schema_v6; mod migration_schema_v7; mod migration_schema_v8; +mod migration_schema_v9; mod types; use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY}; @@ -32,7 +33,7 @@ pub fn migrate_schema( match (from, to) { // Migrating from the current schema version to iself is always OK, a no-op. (_, _) if from == to && to == CURRENT_SCHEMA_VERSION => Ok(()), - // Migrate across multiple versions by recursively migrating one step at a time. + // Upgrade across multiple versions by recursively migrating one step at a time. (_, _) if from.as_u64() + 1 < to.as_u64() => { let next = SchemaVersion(from.as_u64() + 1); migrate_schema::(db.clone(), datadir, from, next, log.clone())?; @@ -181,6 +182,17 @@ pub fn migrate_schema( Ok(()) } + // Upgrade from v8 to v9 to separate the execution payloads into their own column. + (SchemaVersion(8), SchemaVersion(9)) => { + migration_schema_v9::upgrade_to_v9::(db.clone(), log)?; + db.store_schema_version(to) + } + // Downgrade from v9 to v8 to ignore the separation of execution payloads + // NOTE: only works before the Bellatrix fork epoch. + (SchemaVersion(9), SchemaVersion(8)) => { + migration_schema_v9::downgrade_from_v9::(db.clone(), log)?; + db.store_schema_version(to) + } // Anything else is an error. (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { target_version: to, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v7.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v7.rs index ebf89ec22..4cede798e 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v7.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v7.rs @@ -31,7 +31,7 @@ pub(crate) fn update_with_reinitialized_fork_choice( .finalized_checkpoint .root; let anchor_block = db - .get_block(&anchor_block_root) + .get_full_block_prior_to_v9(&anchor_block_root) .map_err(|e| format!("{:?}", e))? .ok_or_else(|| "Missing anchor beacon block".to_string())?; let anchor_state = db diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v8.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v8.rs index 5998eaa12..ef3f7857f 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v8.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v8.rs @@ -34,7 +34,7 @@ pub fn update_fork_choice( // before schema v8 the cache would always miss on skipped slots. for item in balances_cache.items { // Drop any blocks that aren't found, they're presumably too old and this is only a cache. - if let Some(block) = db.get_block(&item.block_root)? { + if let Some(block) = db.get_full_block_prior_to_v9(&item.block_root)? { fork_choice_store.balances_cache.items.push(CacheItemV8 { block_root: item.block_root, epoch: block.slot().epoch(T::EthSpec::slots_per_epoch()), diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v9.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v9.rs new file mode 100644 index 000000000..e2c48d5c8 --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v9.rs @@ -0,0 +1,176 @@ +use crate::beacon_chain::BeaconChainTypes; +use slog::{debug, error, info, Logger}; +use slot_clock::SlotClock; +use std::sync::Arc; +use std::time::Duration; +use store::{DBColumn, Error, HotColdDB, KeyValueStore}; +use types::{EthSpec, Hash256, Slot}; + +const OPS_PER_BLOCK_WRITE: usize = 2; + +/// The slot clock isn't usually available before the database is initialized, so we construct a +/// temporary slot clock by reading the genesis state. It should always exist if the database is +/// initialized at a prior schema version, however we still handle the lack of genesis state +/// gracefully. +fn get_slot_clock( + db: &HotColdDB, + log: &Logger, +) -> Result, Error> { + // At schema v8 the genesis block must be a *full* block (with payload). In all likeliness it + // actually has no payload. + let spec = db.get_chain_spec(); + let genesis_block = if let Some(block) = db.get_full_block_prior_to_v9(&Hash256::zero())? { + block + } else { + error!(log, "Missing genesis block"); + return Ok(None); + }; + let genesis_state = + if let Some(state) = db.get_state(&genesis_block.state_root(), Some(Slot::new(0)))? { + state + } else { + error!(log, "Missing genesis state"; "state_root" => ?genesis_block.state_root()); + return Ok(None); + }; + Ok(Some(T::SlotClock::new( + spec.genesis_slot, + Duration::from_secs(genesis_state.genesis_time()), + Duration::from_secs(spec.seconds_per_slot), + ))) +} + +pub fn upgrade_to_v9( + db: Arc>, + log: Logger, +) -> Result<(), Error> { + // This upgrade is a no-op if the Bellatrix fork epoch has not already passed. This migration + // was implemented before the activation of Bellatrix on all networks except Kiln, so the only + // users who will need to wait for the slow copying migration are Kiln users. + let slot_clock = if let Some(slot_clock) = get_slot_clock::(&db, &log)? { + slot_clock + } else { + error!( + log, + "Unable to complete migration because genesis state or genesis block is missing" + ); + return Err(Error::SlotClockUnavailableForMigration); + }; + + let current_epoch = if let Some(slot) = slot_clock.now() { + slot.epoch(T::EthSpec::slots_per_epoch()) + } else { + return Ok(()); + }; + + let bellatrix_fork_epoch = if let Some(fork_epoch) = db.get_chain_spec().bellatrix_fork_epoch { + fork_epoch + } else { + info!( + log, + "Upgrading database schema to v9 (no-op)"; + "info" => "To downgrade before the merge run `lighthouse db migrate`" + ); + return Ok(()); + }; + + if current_epoch >= bellatrix_fork_epoch { + info!( + log, + "Upgrading database schema to v9"; + "info" => "This will take several minutes. Each block will be read from and \ + re-written to the database. You may safely exit now (Ctrl-C) and resume \ + the migration later. Downgrading is no longer possible." + ); + + for res in db.hot_db.iter_column_keys(DBColumn::BeaconBlock) { + let block_root = res?; + let block = match db.get_full_block_prior_to_v9(&block_root) { + // A pre-v9 block is present. + Ok(Some(block)) => block, + // A block is missing. + Ok(None) => return Err(Error::BlockNotFound(block_root)), + // There was an error reading a pre-v9 block. Try reading it as a post-v9 block. + Err(_) => { + if db.try_get_full_block(&block_root)?.is_some() { + // The block is present as a post-v9 block, assume that it was already + // correctly migrated. + continue; + } else { + // This scenario should not be encountered since a prior check has ensured + // that this block exists. + return Err(Error::V9MigrationFailure(block_root)); + } + } + }; + + if block.message().execution_payload().is_ok() { + // Overwrite block with blinded block and store execution payload separately. + debug!( + log, + "Rewriting Bellatrix block"; + "block_root" => ?block_root, + ); + + let mut kv_batch = Vec::with_capacity(OPS_PER_BLOCK_WRITE); + db.block_as_kv_store_ops(&block_root, block, &mut kv_batch)?; + db.hot_db.do_atomically(kv_batch)?; + } + } + } else { + info!( + log, + "Upgrading database schema to v9 (no-op)"; + "info" => "To downgrade before the merge run `lighthouse db migrate`" + ); + } + + Ok(()) +} + +// This downgrade is conditional and will only succeed if the Bellatrix fork epoch hasn't been +// reached. +pub fn downgrade_from_v9( + db: Arc>, + log: Logger, +) -> Result<(), Error> { + let slot_clock = if let Some(slot_clock) = get_slot_clock::(&db, &log)? { + slot_clock + } else { + error!( + log, + "Unable to complete migration because genesis state or genesis block is missing" + ); + return Err(Error::SlotClockUnavailableForMigration); + }; + + let current_epoch = if let Some(slot) = slot_clock.now() { + slot.epoch(T::EthSpec::slots_per_epoch()) + } else { + return Ok(()); + }; + + let bellatrix_fork_epoch = if let Some(fork_epoch) = db.get_chain_spec().bellatrix_fork_epoch { + fork_epoch + } else { + info!( + log, + "Downgrading database schema from v9"; + "info" => "You need to upgrade to v9 again before the merge" + ); + return Ok(()); + }; + + if current_epoch >= bellatrix_fork_epoch { + error!( + log, + "Downgrading from schema v9 after the Bellatrix fork epoch is not supported"; + "current_epoch" => current_epoch, + "bellatrix_fork_epoch" => bellatrix_fork_epoch, + "reason" => "You need a v9 schema database to run on a merged version of Prater or \ + mainnet. On Kiln, you have to re-sync", + ); + Err(Error::ResyncRequiredForExecutionPayloadSeparation) + } else { + Ok(()) + } +} diff --git a/beacon_node/beacon_chain/src/snapshot_cache.rs b/beacon_node/beacon_chain/src/snapshot_cache.rs index f4bbae8a3..558558136 100644 --- a/beacon_node/beacon_chain/src/snapshot_cache.rs +++ b/beacon_node/beacon_chain/src/snapshot_cache.rs @@ -3,8 +3,8 @@ use itertools::process_results; use std::cmp; use std::time::Duration; use types::{ - beacon_state::CloneConfig, BeaconState, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, - Slot, + beacon_state::CloneConfig, BeaconState, BlindedPayload, ChainSpec, Epoch, EthSpec, Hash256, + SignedBeaconBlock, Slot, }; /// The default size of the cache. @@ -23,7 +23,7 @@ pub struct PreProcessingSnapshot { pub pre_state: BeaconState, /// This value is only set to `Some` if the `pre_state` was *not* advanced forward. pub beacon_state_root: Option, - pub beacon_block: SignedBeaconBlock, + pub beacon_block: SignedBeaconBlock>, pub beacon_block_root: Hash256, } @@ -33,7 +33,7 @@ impl From> for PreProcessingSnapshot { Self { pre_state: snapshot.beacon_state, beacon_state_root, - beacon_block: snapshot.beacon_block, + beacon_block: snapshot.beacon_block.into(), beacon_block_root: snapshot.beacon_block_root, } } @@ -63,7 +63,7 @@ impl CacheItem { Some(self.beacon_block.state_root()).filter(|_| self.pre_state.is_none()); PreProcessingSnapshot { - beacon_block: self.beacon_block, + beacon_block: self.beacon_block.into(), beacon_block_root: self.beacon_block_root, pre_state: self.pre_state.unwrap_or(self.beacon_state), beacon_state_root, @@ -76,7 +76,7 @@ impl CacheItem { Some(self.beacon_block.state_root()).filter(|_| self.pre_state.is_none()); PreProcessingSnapshot { - beacon_block: self.beacon_block.clone(), + beacon_block: self.beacon_block.clone().into(), beacon_block_root: self.beacon_block_root, pre_state: self .pre_state diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 7263bf051..3c8299f16 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -528,8 +528,11 @@ where self.chain.slot().unwrap() } - pub fn get_block(&self, block_hash: SignedBeaconBlockHash) -> Option> { - self.chain.get_block(&block_hash.into()).unwrap() + pub fn get_block( + &self, + block_hash: SignedBeaconBlockHash, + ) -> Option>> { + self.chain.get_blinded_block(&block_hash.into()).unwrap() } pub fn block_exists(&self, block_hash: SignedBeaconBlockHash) -> bool { diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index 189d3bade..0ea5debc3 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -55,11 +55,15 @@ fn produces_attestations() { Slot::from(num_blocks_produced) }; - let block = chain + let blinded_block = chain .block_at_slot(block_slot, WhenSlotSkipped::Prev) .expect("should get block") .expect("block should not be skipped"); - let block_root = block.message().tree_hash_root(); + let block_root = blinded_block.message().tree_hash_root(); + let block = chain + .store + .make_full_block(&block_root, blinded_block) + .unwrap(); let epoch_boundary_slot = state .current_epoch() diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index 00bf9fa9a..2fe8818a9 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -975,7 +975,7 @@ fn attestation_that_skips_epochs() { let block_slot = harness .chain .store - .get_block(&block_root) + .get_blinded_block(&block_root) .expect("should not error getting block") .expect("should find attestation block") .message() diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 9acfba17b..ca65b05fd 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -46,6 +46,18 @@ fn get_chain_segment() -> Vec> { .chain_dump() .expect("should dump chain") .into_iter() + .map(|snapshot| { + let full_block = harness + .chain + .store + .make_full_block(&snapshot.beacon_block_root, snapshot.beacon_block) + .unwrap(); + BeaconSnapshot { + beacon_block_root: snapshot.beacon_block_root, + beacon_block: full_block, + beacon_state: snapshot.beacon_state, + } + }) .skip(1) .collect() } diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 94520bd7a..1aa9844a3 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -71,7 +71,7 @@ impl InvalidPayloadRig { fn block_hash(&self, block_root: Hash256) -> ExecutionBlockHash { self.harness .chain - .get_block(&block_root) + .get_blinded_block(&block_root) .unwrap() .unwrap() .message() @@ -273,7 +273,12 @@ impl InvalidPayloadRig { } assert_eq!( - self.harness.chain.get_block(&block_root).unwrap().unwrap(), + self.harness + .chain + .store + .get_full_block(&block_root) + .unwrap() + .unwrap(), block, "block from db must match block imported" ); @@ -311,7 +316,11 @@ impl InvalidPayloadRig { assert_eq!(block_in_forkchoice, None); assert!( - self.harness.chain.get_block(&block_root).unwrap().is_none(), + self.harness + .chain + .get_blinded_block(&block_root) + .unwrap() + .is_none(), "invalid block cannot be accessed via get_block" ); } else { @@ -427,7 +436,7 @@ fn justified_checkpoint_becomes_invalid() { let parent_root_of_justified = rig .harness .chain - .get_block(&justified_checkpoint.root) + .get_blinded_block(&justified_checkpoint.root) .unwrap() .unwrap() .parent_root(); @@ -643,7 +652,13 @@ fn invalidates_all_descendants() { assert!(rig.execution_status(fork_block_root).is_invalid()); for root in blocks { - let slot = rig.harness.chain.get_block(&root).unwrap().unwrap().slot(); + let slot = rig + .harness + .chain + .get_blinded_block(&root) + .unwrap() + .unwrap() + .slot(); // Fork choice doesn't have info about pre-finalization, nothing to check here. if slot < finalized_slot { @@ -707,7 +722,13 @@ fn switches_heads() { assert!(rig.execution_status(fork_block_root).is_optimistic()); for root in blocks { - let slot = rig.harness.chain.get_block(&root).unwrap().unwrap().slot(); + let slot = rig + .harness + .chain + .get_blinded_block(&root) + .unwrap() + .unwrap() + .slot(); // Fork choice doesn't have info about pre-finalization, nothing to check here. if slot < finalized_slot { @@ -739,9 +760,17 @@ fn invalid_during_processing() { ]; // 0 should be present in the chain. - assert!(rig.harness.chain.get_block(&roots[0]).unwrap().is_some()); + assert!(rig + .harness + .chain + .get_blinded_block(&roots[0]) + .unwrap() + .is_some()); // 1 should *not* be present in the chain. - assert_eq!(rig.harness.chain.get_block(&roots[1]).unwrap(), None); + assert_eq!( + rig.harness.chain.get_blinded_block(&roots[1]).unwrap(), + None + ); // 2 should be the head. let head = rig.harness.chain.head_info().unwrap(); assert_eq!(head.block_root, roots[2]); @@ -760,7 +789,7 @@ fn invalid_after_optimistic_sync() { ]; for root in &roots { - assert!(rig.harness.chain.get_block(root).unwrap().is_some()); + assert!(rig.harness.chain.get_blinded_block(root).unwrap().is_some()); } // 2 should be the head. diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 0780aca61..771295c41 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -313,7 +313,10 @@ fn epoch_boundary_state_attestation_processing() { for (attestation, subnet_id) in late_attestations.into_iter().flatten() { // load_epoch_boundary_state is idempotent! let block_root = attestation.data.beacon_block_root; - let block = store.get_block(&block_root).unwrap().expect("block exists"); + let block = store + .get_blinded_block(&block_root) + .unwrap() + .expect("block exists"); let epoch_boundary_state = store .load_epoch_boundary_state(&block.state_root()) .expect("no error") @@ -603,7 +606,7 @@ fn delete_blocks_and_states() { ); let faulty_head_block = store - .get_block(&faulty_head.into()) + .get_blinded_block(&faulty_head.into()) .expect("no errors") .expect("faulty head block exists"); @@ -645,7 +648,7 @@ fn delete_blocks_and_states() { break; } store.delete_block(&block_root).unwrap(); - assert_eq!(store.get_block(&block_root).unwrap(), None); + assert_eq!(store.get_blinded_block(&block_root).unwrap(), None); } // Deleting frozen states should do nothing @@ -890,7 +893,12 @@ fn shuffling_compatible_short_fork() { } fn get_state_for_block(harness: &TestHarness, block_root: Hash256) -> BeaconState { - let head_block = harness.chain.get_block(&block_root).unwrap().unwrap(); + let head_block = harness + .chain + .store + .get_blinded_block(&block_root) + .unwrap() + .unwrap(); harness .chain .get_state(&head_block.state_root(), Some(head_block.slot())) @@ -1695,7 +1703,7 @@ fn check_all_blocks_exist<'a>( blocks: impl Iterator, ) { for &block_hash in blocks { - let block = harness.chain.get_block(&block_hash.into()).unwrap(); + let block = harness.chain.get_blinded_block(&block_hash.into()).unwrap(); assert!( block.is_some(), "expected block {:?} to be in DB", @@ -1742,7 +1750,7 @@ fn check_no_blocks_exist<'a>( blocks: impl Iterator, ) { for &block_hash in blocks { - let block = harness.chain.get_block(&block_hash.into()).unwrap(); + let block = harness.chain.get_blinded_block(&block_hash.into()).unwrap(); assert!( block.is_none(), "did not expect block {:?} to be in the DB", @@ -1988,7 +1996,12 @@ fn weak_subjectivity_sync() { .unwrap() .unwrap(); let wss_checkpoint = harness.chain.head_info().unwrap().finalized_checkpoint; - let wss_block = harness.get_block(wss_checkpoint.root.into()).unwrap(); + let wss_block = harness + .chain + .store + .get_full_block(&wss_checkpoint.root) + .unwrap() + .unwrap(); let wss_state = full_store .get_state(&wss_block.state_root(), None) .unwrap() @@ -2042,8 +2055,14 @@ fn weak_subjectivity_sync() { for snapshot in new_blocks { let block = &snapshot.beacon_block; + let full_block = harness + .chain + .store + .make_full_block(&snapshot.beacon_block_root, block.clone()) + .unwrap(); + beacon_chain.slot_clock.set_slot(block.slot().as_u64()); - beacon_chain.process_block(block.clone()).unwrap(); + beacon_chain.process_block(full_block).unwrap(); beacon_chain.fork_choice().unwrap(); // Check that the new block's state can be loaded correctly. @@ -2085,13 +2104,13 @@ fn weak_subjectivity_sync() { .map(|s| s.beacon_block.clone()) .collect::>(); beacon_chain - .import_historical_block_batch(&historical_blocks) + .import_historical_block_batch(historical_blocks.clone()) .unwrap(); assert_eq!(beacon_chain.store.get_oldest_block_slot(), 0); // Resupplying the blocks should not fail, they can be safely ignored. beacon_chain - .import_historical_block_batch(&historical_blocks) + .import_historical_block_batch(historical_blocks) .unwrap(); // The forwards iterator should now match the original chain @@ -2114,7 +2133,7 @@ fn weak_subjectivity_sync() { .unwrap() .map(Result::unwrap) { - let block = store.get_block(&block_root).unwrap().unwrap(); + let block = store.get_blinded_block(&block_root).unwrap().unwrap(); assert_eq!(block.slot(), slot); } @@ -2574,7 +2593,7 @@ fn check_iterators(harness: &TestHarness) { } fn get_finalized_epoch_boundary_blocks( - dump: &[BeaconSnapshot], + dump: &[BeaconSnapshot>], ) -> HashSet { dump.iter() .cloned() @@ -2582,7 +2601,9 @@ fn get_finalized_epoch_boundary_blocks( .collect() } -fn get_blocks(dump: &[BeaconSnapshot]) -> HashSet { +fn get_blocks( + dump: &[BeaconSnapshot>], +) -> HashSet { dump.iter() .cloned() .map(|checkpoint| checkpoint.beacon_block_root.into()) diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index 2a0aa35b1..7b17937a2 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -744,7 +744,11 @@ fn block_roots_skip_slot_behaviour() { "WhenSlotSkipped::Prev should accurately return the prior skipped block" ); - let expected_block = harness.chain.get_block(&skipped_root).unwrap().unwrap(); + let expected_block = harness + .chain + .get_blinded_block(&skipped_root) + .unwrap() + .unwrap(); assert_eq!( harness @@ -782,7 +786,11 @@ fn block_roots_skip_slot_behaviour() { "WhenSlotSkipped::None and WhenSlotSkipped::Prev should be equal on non-skipped slot" ); - let expected_block = harness.chain.get_block(&skips_prev).unwrap().unwrap(); + let expected_block = harness + .chain + .get_blinded_block(&skips_prev) + .unwrap() + .unwrap(); assert_eq!( harness diff --git a/beacon_node/execution_layer/Cargo.toml b/beacon_node/execution_layer/Cargo.toml index 6cd49e9f6..0351b5e43 100644 --- a/beacon_node/execution_layer/Cargo.toml +++ b/beacon_node/execution_layer/Cargo.toml @@ -35,3 +35,4 @@ rand = "0.8.5" zeroize = { version = "1.4.2", features = ["zeroize_derive"] } lighthouse_metrics = { path = "../../common/lighthouse_metrics" } lazy_static = "1.4.0" +ethers-core = { git = "https://github.com/gakonst/ethers-rs", rev = "02ad93a1cfb7b62eb051c77c61dc4c0218428e4a" } diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index ad14ceb51..9eb98cecb 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -1,13 +1,14 @@ use crate::engines::ForkChoiceState; use async_trait::async_trait; use eth1::http::RpcError; +pub use ethers_core::types::Transaction; pub use json_structures::TransitionConfigurationV1; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; use slog::Logger; pub use types::{ - Address, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadHeader, Hash256, - Uint256, + Address, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadHeader, FixedVector, + Hash256, Uint256, VariableList, }; pub mod auth; @@ -46,6 +47,8 @@ pub enum Error { prev_randao: Hash256, suggested_fee_recipient: Address, }, + DeserializeTransaction(ssz_types::Error), + DeserializeTransactions(ssz_types::Error), } impl From for Error { @@ -109,6 +112,9 @@ pub enum BlockByNumberQuery<'a> { Tag(&'a str), } +/// Representation of an exection block with enough detail to determine the terminal PoW block. +/// +/// See `get_pow_block_hash_at_total_difficulty`. #[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ExecutionBlock { @@ -120,6 +126,35 @@ pub struct ExecutionBlock { pub total_difficulty: Uint256, } +/// Representation of an exection block with enough detail to reconstruct a payload. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ExecutionBlockWithTransactions { + pub parent_hash: ExecutionBlockHash, + #[serde(alias = "miner")] + pub fee_recipient: Address, + pub state_root: Hash256, + pub receipts_root: Hash256, + #[serde(with = "ssz_types::serde_utils::hex_fixed_vec")] + pub logs_bloom: FixedVector, + #[serde(alias = "mixHash")] + pub prev_randao: Hash256, + #[serde(rename = "number", with = "eth2_serde_utils::u64_hex_be")] + pub block_number: u64, + #[serde(with = "eth2_serde_utils::u64_hex_be")] + pub gas_limit: u64, + #[serde(with = "eth2_serde_utils::u64_hex_be")] + pub gas_used: u64, + #[serde(with = "eth2_serde_utils::u64_hex_be")] + pub timestamp: u64, + #[serde(with = "ssz_types::serde_utils::hex_var_list")] + pub extra_data: VariableList, + pub base_fee_per_gas: Uint256, + #[serde(rename = "hash")] + pub block_hash: ExecutionBlockHash, + pub transactions: Vec, +} + #[derive(Clone, Copy, Debug, PartialEq)] pub struct PayloadAttributes { pub timestamp: u64, diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index 718b08534..179045ccf 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -149,7 +149,7 @@ impl HttpJsonRpc { .await } - pub async fn get_block_by_hash<'a>( + pub async fn get_block_by_hash( &self, block_hash: ExecutionBlockHash, ) -> Result, Error> { @@ -159,6 +159,15 @@ impl HttpJsonRpc { .await } + pub async fn get_block_by_hash_with_txns( + &self, + block_hash: ExecutionBlockHash, + ) -> Result>, Error> { + let params = json!([block_hash, true]); + self.rpc_request(ETH_GET_BLOCK_BY_HASH, params, ETH_GET_BLOCK_BY_HASH_TIMEOUT) + .await + } + pub async fn new_payload_v1( &self, execution_payload: ExecutionPayload, diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 479158b98..023cfa6e3 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -1183,6 +1183,64 @@ impl ExecutionLayer { } } + pub async fn get_payload_by_block_hash( + &self, + hash: ExecutionBlockHash, + ) -> Result>, Error> { + self.engines() + .first_success(|engine| async move { + self.get_payload_by_block_hash_from_engine(engine, hash) + .await + }) + .await + .map_err(Error::EngineErrors) + } + + async fn get_payload_by_block_hash_from_engine( + &self, + engine: &Engine, + hash: ExecutionBlockHash, + ) -> Result>, ApiError> { + let _timer = metrics::start_timer(&metrics::EXECUTION_LAYER_GET_PAYLOAD_BY_BLOCK_HASH); + + if hash == ExecutionBlockHash::zero() { + return Ok(Some(ExecutionPayload::default())); + } + + let block = if let Some(block) = engine.api.get_block_by_hash_with_txns::(hash).await? { + block + } else { + return Ok(None); + }; + + let transactions = VariableList::new( + block + .transactions + .into_iter() + .map(|transaction| VariableList::new(transaction.rlp().to_vec())) + .collect::>() + .map_err(ApiError::DeserializeTransaction)?, + ) + .map_err(ApiError::DeserializeTransactions)?; + + Ok(Some(ExecutionPayload { + parent_hash: block.parent_hash, + fee_recipient: block.fee_recipient, + state_root: block.state_root, + receipts_root: block.receipts_root, + logs_bloom: block.logs_bloom, + prev_randao: block.prev_randao, + block_number: block.block_number, + gas_limit: block.gas_limit, + gas_used: block.gas_used, + timestamp: block.timestamp, + extra_data: block.extra_data, + base_fee_per_gas: block.base_fee_per_gas, + block_hash: block.block_hash, + transactions, + })) + } + pub async fn propose_blinded_beacon_block( &self, block: &SignedBeaconBlock>, diff --git a/beacon_node/execution_layer/src/metrics.rs b/beacon_node/execution_layer/src/metrics.rs index 4a761c8e4..356c5a46d 100644 --- a/beacon_node/execution_layer/src/metrics.rs +++ b/beacon_node/execution_layer/src/metrics.rs @@ -31,4 +31,8 @@ lazy_static::lazy_static! { "Indicates hits or misses for already having prepared a payload id before payload production", &["event"] ); + pub static ref EXECUTION_LAYER_GET_PAYLOAD_BY_BLOCK_HASH: Result = try_create_histogram( + "execution_layer_get_payload_by_block_hash_time", + "Time to reconstruct a payload from the EE using eth_getBlockByHash" + ); } diff --git a/beacon_node/http_api/src/attestation_performance.rs b/beacon_node/http_api/src/attestation_performance.rs index 5cd9894ad..2b4543656 100644 --- a/beacon_node/http_api/src/attestation_performance.rs +++ b/beacon_node/http_api/src/attestation_performance.rs @@ -7,7 +7,7 @@ use state_processing::{ per_epoch_processing::EpochProcessingSummary, BlockReplayError, BlockReplayer, }; use std::sync::Arc; -use types::{BeaconState, BeaconStateError, EthSpec, Hash256, SignedBeaconBlock}; +use types::{BeaconState, BeaconStateError, EthSpec, Hash256}; use warp_utils::reject::{beacon_chain_error, custom_bad_request, custom_server_error}; const MAX_REQUEST_RANGE_EPOCHS: usize = 100; @@ -112,7 +112,7 @@ pub fn get_attestation_performance( ) })?; let first_block = chain - .get_block(first_block_root) + .get_blinded_block(first_block_root) .and_then(|maybe_block| { maybe_block.ok_or(BeaconChainError::MissingBeaconBlock(*first_block_root)) }) @@ -120,7 +120,7 @@ pub fn get_attestation_performance( // Load the block of the prior slot which will be used to build the starting state. let prior_block = chain - .get_block(&first_block.parent_root()) + .get_blinded_block(&first_block.parent_root()) .and_then(|maybe_block| { maybe_block .ok_or_else(|| BeaconChainError::MissingBeaconBlock(first_block.parent_root())) @@ -197,13 +197,13 @@ pub fn get_attestation_performance( .iter() .map(|root| { chain - .get_block(root) + .get_blinded_block(root) .and_then(|maybe_block| { maybe_block.ok_or(BeaconChainError::MissingBeaconBlock(*root)) }) .map_err(beacon_chain_error) }) - .collect::>, _>>()?; + .collect::, _>>()?; replayer = replayer .apply_blocks(blocks, None) diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index c21701f3a..727215bfc 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -1,7 +1,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes, WhenSlotSkipped}; use eth2::types::BlockId as CoreBlockId; use std::str::FromStr; -use types::{Hash256, SignedBeaconBlock, Slot}; +use types::{BlindedPayload, Hash256, SignedBeaconBlock, Slot}; /// Wraps `eth2::types::BlockId` and provides a simple way to obtain a block or root for a given /// `BlockId`. @@ -52,7 +52,55 @@ impl BlockId { } /// Return the `SignedBeaconBlock` identified by `self`. - pub fn block( + pub fn blinded_block( + &self, + chain: &BeaconChain, + ) -> Result>, warp::Rejection> { + match &self.0 { + CoreBlockId::Head => chain + .head_beacon_block() + .map(Into::into) + .map_err(warp_utils::reject::beacon_chain_error), + CoreBlockId::Slot(slot) => { + let root = self.root(chain)?; + chain + .get_blinded_block(&root) + .map_err(warp_utils::reject::beacon_chain_error) + .and_then(|block_opt| match block_opt { + Some(block) => { + if block.slot() != *slot { + return Err(warp_utils::reject::custom_not_found(format!( + "slot {} was skipped", + slot + ))); + } + Ok(block) + } + None => Err(warp_utils::reject::custom_not_found(format!( + "beacon block with root {}", + root + ))), + }) + } + _ => { + let root = self.root(chain)?; + chain + .get_blinded_block(&root) + .map_err(warp_utils::reject::beacon_chain_error) + .and_then(|root_opt| { + root_opt.ok_or_else(|| { + warp_utils::reject::custom_not_found(format!( + "beacon block with root {}", + root + )) + }) + }) + } + } + } + + /// Return the `SignedBeaconBlock` identified by `self`. + pub async fn full_block( &self, chain: &BeaconChain, ) -> Result, warp::Rejection> { @@ -64,6 +112,7 @@ impl BlockId { let root = self.root(chain)?; chain .get_block(&root) + .await .map_err(warp_utils::reject::beacon_chain_error) .and_then(|block_opt| match block_opt { Some(block) => { @@ -85,6 +134,7 @@ impl BlockId { let root = self.root(chain)?; chain .get_block(&root) + .await .map_err(warp_utils::reject::beacon_chain_error) .and_then(|root_opt| { root_opt.ok_or_else(|| { diff --git a/beacon_node/http_api/src/block_packing_efficiency.rs b/beacon_node/http_api/src/block_packing_efficiency.rs index d948c0d7d..1b924f382 100644 --- a/beacon_node/http_api/src/block_packing_efficiency.rs +++ b/beacon_node/http_api/src/block_packing_efficiency.rs @@ -10,8 +10,8 @@ use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; use std::sync::Arc; use types::{ - BeaconCommittee, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, - OwnedBeaconCommittee, RelativeEpoch, SignedBeaconBlock, Slot, + BeaconCommittee, BeaconState, BeaconStateError, BlindedPayload, ChainSpec, Epoch, EthSpec, + Hash256, OwnedBeaconCommittee, RelativeEpoch, SignedBeaconBlock, Slot, }; use warp_utils::reject::{beacon_chain_error, custom_bad_request, custom_server_error}; @@ -104,7 +104,7 @@ impl PackingEfficiencyHandler { fn apply_block( &mut self, - block: &SignedBeaconBlock, + block: &SignedBeaconBlock>, ) -> Result { let block_body = block.message().body(); let attestations = block_body.attestations(); @@ -251,7 +251,7 @@ pub fn get_block_packing_efficiency( .ok_or_else(|| custom_server_error("no blocks were loaded".to_string()))?; let first_block = chain - .get_block(first_block_root) + .get_blinded_block(first_block_root) .and_then(|maybe_block| { maybe_block.ok_or(BeaconChainError::MissingBeaconBlock(*first_block_root)) }) @@ -309,7 +309,7 @@ pub fn get_block_packing_efficiency( }; let pre_block_hook = |_state: &mut BeaconState, - block: &SignedBeaconBlock| + block: &SignedBeaconBlock<_, BlindedPayload<_>>| -> Result<(), PackingEfficiencyError> { let slot = block.slot(); @@ -363,13 +363,13 @@ pub fn get_block_packing_efficiency( .iter() .map(|root| { chain - .get_block(root) + .get_blinded_block(root) .and_then(|maybe_block| { maybe_block.ok_or(BeaconChainError::MissingBeaconBlock(*root)) }) .map_err(beacon_chain_error) }) - .collect::>, _>>()?; + .collect::, _>>()?; replayer = replayer .apply_blocks(blocks, None) diff --git a/beacon_node/http_api/src/database.rs b/beacon_node/http_api/src/database.rs index 3a7c81ad8..014db8a60 100644 --- a/beacon_node/http_api/src/database.rs +++ b/beacon_node/http_api/src/database.rs @@ -2,7 +2,7 @@ use beacon_chain::store::{metadata::CURRENT_SCHEMA_VERSION, AnchorInfo}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2::lighthouse::DatabaseInfo; use std::sync::Arc; -use types::SignedBeaconBlock; +use types::SignedBlindedBeaconBlock; pub fn info( chain: Arc>, @@ -22,10 +22,10 @@ pub fn info( pub fn historical_blocks( chain: Arc>, - blocks: Vec>, + blocks: Vec>, ) -> Result { chain - .import_historical_block_batch(&blocks) + .import_historical_block_batch(blocks) .map_err(warp_utils::reject::beacon_chain_error)?; let anchor = chain.store.get_anchor_info().ok_or_else(|| { diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 7b58ce682..5e28ac6a7 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -48,8 +48,9 @@ use types::{ Attestation, AttesterSlashing, BeaconBlockBodyMerge, BeaconBlockMerge, BeaconStateError, BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, ProposerPreparationData, ProposerSlashing, RelativeEpoch, Signature, SignedAggregateAndProof, - SignedBeaconBlock, SignedBeaconBlockMerge, SignedContributionAndProof, SignedVoluntaryExit, - Slot, SyncCommitteeMessage, SyncContributionData, + SignedBeaconBlock, SignedBeaconBlockMerge, SignedBlindedBeaconBlock, + SignedContributionAndProof, SignedVoluntaryExit, Slot, SyncCommitteeMessage, + SyncContributionData, }; use version::{ add_consensus_version_header, fork_versioned_response, inconsistent_fork_rejection, @@ -826,10 +827,10 @@ pub fn serve( (None, None) => chain .head_beacon_block() .map_err(warp_utils::reject::beacon_chain_error) - .map(|block| (block.canonical_root(), block))?, + .map(|block| (block.canonical_root(), block.into()))?, // Only the parent root parameter, do a forwards-iterator lookup. (None, Some(parent_root)) => { - let parent = BlockId::from_root(parent_root).block(&chain)?; + let parent = BlockId::from_root(parent_root).blinded_block(&chain)?; let (root, _slot) = chain .forwards_iter_block_roots(parent.slot()) .map_err(warp_utils::reject::beacon_chain_error)? @@ -847,14 +848,14 @@ pub fn serve( })?; BlockId::from_root(root) - .block(&chain) + .blinded_block(&chain) .map(|block| (root, block))? } // Slot is supplied, search by slot and optionally filter by // parent root. (Some(slot), parent_root_opt) => { let root = BlockId::from_slot(slot).root(&chain)?; - let block = BlockId::from_root(root).block(&chain)?; + let block = BlockId::from_root(root).blinded_block(&chain)?; // If the parent root was supplied, check that it matches the block // obtained via a slot lookup. @@ -899,7 +900,7 @@ pub fn serve( .and_then(|block_id: BlockId, chain: Arc>| { blocking_json_task(move || { let root = block_id.root(&chain)?; - let block = BlockId::from_root(root).block(&chain)?; + let block = BlockId::from_root(root).blinded_block(&chain)?; let canonical = chain .block_root_at_slot(block.slot(), WhenSlotSkipped::None) @@ -1161,8 +1162,8 @@ pub fn serve( block_id: BlockId, chain: Arc>, accept_header: Option| { - blocking_task(move || { - let block = block_id.block(&chain)?; + async move { + let block = block_id.full_block(&chain).await?; let fork_name = block .fork_name(&chain.spec) .map_err(inconsistent_fork_rejection)?; @@ -1181,7 +1182,7 @@ pub fn serve( .map(|res| warp::reply::json(&res).into_response()), } .map(|resp| add_consensus_version_header(resp, fork_name)) - }) + } }, ); @@ -1207,7 +1208,7 @@ pub fn serve( .and_then(|block_id: BlockId, chain: Arc>| { blocking_json_task(move || { block_id - .block(&chain) + .blinded_block(&chain) .map(|block| block.message().body().attestations().clone()) .map(api_types::GenericResponse::from) }) @@ -2786,7 +2787,7 @@ pub fn serve( .and(chain_filter.clone()) .and(log_filter.clone()) .and_then( - |blocks: Vec>, + |blocks: Vec>, chain: Arc>, log: Logger| { info!( diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 98dd3d5a5..b153e9a27 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -762,9 +762,9 @@ impl ApiTester { } } - fn get_block(&self, block_id: BlockId) -> Option> { - let root = self.get_block_root(block_id); - root.and_then(|root| self.chain.get_block(&root).unwrap()) + async fn get_block(&self, block_id: BlockId) -> Option> { + let root = self.get_block_root(block_id)?; + self.chain.get_block(&root).await.unwrap() } pub async fn test_beacon_headers_all_slots(self) -> Self { @@ -859,7 +859,11 @@ impl ApiTester { } } - let block_opt = block_root_opt.and_then(|root| self.chain.get_block(&root).unwrap()); + let block_opt = if let Some(root) = block_root_opt { + self.chain.get_block(&root).await.unwrap() + } else { + None + }; if block_opt.is_none() && result.is_none() { continue; @@ -945,7 +949,7 @@ impl ApiTester { pub async fn test_beacon_blocks(self) -> Self { for block_id in self.interesting_block_ids() { - let expected = self.get_block(block_id); + let expected = self.get_block(block_id).await; if let BlockId::Slot(slot) = block_id { if expected.is_none() { @@ -1030,6 +1034,7 @@ impl ApiTester { let expected = self .get_block(block_id) + .await .map(|block| block.message().body().attestations().clone().into()); if let BlockId::Slot(slot) = block_id { diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index 95ca7dc27..5ed3614de 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -37,7 +37,7 @@ rand = "0.8.5" directory = { path = "../../common/directory" } regex = "1.5.5" strum = { version = "0.24.0", features = ["derive"] } -superstruct = "0.4.1" +superstruct = "0.5.0" prometheus-client = "0.15.0" unused_port = { path = "../../common/unused_port" } diff --git a/beacon_node/network/src/beacon_processor/mod.rs b/beacon_node/network/src/beacon_processor/mod.rs index eb40be960..513949bcc 100644 --- a/beacon_node/network/src/beacon_processor/mod.rs +++ b/beacon_node/network/src/beacon_processor/mod.rs @@ -1344,6 +1344,7 @@ impl BeaconProcessor { "worker" => worker_id, ); + let sub_executor = executor.clone(); executor.spawn_blocking( move || { let _worker_timer = worker_timer; @@ -1520,7 +1521,15 @@ impl BeaconProcessor { peer_id, request_id, request, - } => worker.handle_blocks_by_range_request(peer_id, request_id, request), + } => { + return worker.handle_blocks_by_range_request( + sub_executor, + send_idle_on_drop, + peer_id, + request_id, + request, + ) + } /* * Processing of blocks by roots requests from other peers. */ @@ -1528,7 +1537,15 @@ impl BeaconProcessor { peer_id, request_id, request, - } => worker.handle_blocks_by_root_request(peer_id, request_id, request), + } => { + return worker.handle_blocks_by_root_request( + sub_executor, + send_idle_on_drop, + peer_id, + request_id, + request, + ) + } Work::UnknownBlockAttestation { message_id, peer_id, diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index f79a65574..2d2196b9e 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -1,4 +1,4 @@ -use crate::beacon_processor::worker::FUTURE_SLOT_TOLERANCE; +use crate::beacon_processor::{worker::FUTURE_SLOT_TOLERANCE, SendOnDrop}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::SyncMessage; @@ -9,6 +9,7 @@ use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use slog::{debug, error, warn}; use slot_clock::SlotClock; +use task_executor::TaskExecutor; use types::{Epoch, EthSpec, Hash256, Slot}; use super::Worker; @@ -122,38 +123,71 @@ impl Worker { /// Handle a `BlocksByRoot` request from the peer. pub fn handle_blocks_by_root_request( - &self, + self, + executor: TaskExecutor, + send_on_drop: SendOnDrop, peer_id: PeerId, request_id: PeerRequestId, request: BlocksByRootRequest, ) { - let mut send_block_count = 0; - for root in request.block_roots.iter() { - if let Ok(Some(block)) = self.chain.get_block_checking_early_attester_cache(root) { - self.send_response( - peer_id, - Response::BlocksByRoot(Some(Box::new(block))), - request_id, - ); - send_block_count += 1; - } else { - debug!(self.log, "Peer requested unknown block"; + // Fetching blocks is async because it may have to hit the execution layer for payloads. + executor.spawn( + async move { + let mut send_block_count = 0; + for root in request.block_roots.iter() { + match self + .chain + .get_block_checking_early_attester_cache(root) + .await + { + Ok(Some(block)) => { + self.send_response( + peer_id, + Response::BlocksByRoot(Some(Box::new(block))), + request_id, + ); + send_block_count += 1; + } + Ok(None) => { + debug!( + self.log, + "Peer requested unknown block"; + "peer" => %peer_id, + "request_root" => ?root + ); + } + Err(e) => { + debug!( + self.log, + "Error fetching block for peer"; + "peer" => %peer_id, + "request_root" => ?root, + "error" => ?e, + ); + } + } + } + debug!( + self.log, + "Received BlocksByRoot Request"; "peer" => %peer_id, - "request_root" => ?root); - } - } - debug!(self.log, "Received BlocksByRoot Request"; - "peer" => %peer_id, - "requested" => request.block_roots.len(), - "returned" => send_block_count); + "requested" => request.block_roots.len(), + "returned" => send_block_count + ); - // send stream termination - self.send_response(peer_id, Response::BlocksByRoot(None), request_id); + // send stream termination + self.send_response(peer_id, Response::BlocksByRoot(None), request_id); + drop(send_on_drop); + }, + "load_blocks_by_root_blocks", + ) } /// Handle a `BlocksByRange` request from the peer. pub fn handle_blocks_by_range_request( - &self, + self, + executor: TaskExecutor, + send_on_drop: SendOnDrop, peer_id: PeerId, request_id: PeerRequestId, mut req: BlocksByRangeRequest, @@ -228,54 +262,84 @@ impl Worker { // remove all skip slots let block_roots = block_roots.into_iter().flatten().collect::>(); - let mut blocks_sent = 0; - for root in block_roots { - if let Ok(Some(block)) = self.chain.store.get_block(&root) { - // Due to skip slots, blocks could be out of the range, we ensure they are in the - // range before sending - if block.slot() >= req.start_slot - && block.slot() < req.start_slot + req.count * req.step - { - blocks_sent += 1; - self.send_network_message(NetworkMessage::SendResponse { - peer_id, - response: Response::BlocksByRange(Some(Box::new(block))), - id: request_id, - }); + // Fetching blocks is async because it may have to hit the execution layer for payloads. + executor.spawn( + async move { + let mut blocks_sent = 0; + + for root in block_roots { + match self.chain.get_block(&root).await { + Ok(Some(block)) => { + // Due to skip slots, blocks could be out of the range, we ensure they + // are in the range before sending + if block.slot() >= req.start_slot + && block.slot() < req.start_slot + req.count * req.step + { + blocks_sent += 1; + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: Response::BlocksByRange(Some(Box::new(block))), + id: request_id, + }); + } + } + Ok(None) => { + error!( + self.log, + "Block in the chain is not in the store"; + "request_root" => ?root + ); + break; + } + Err(e) => { + error!( + self.log, + "Error fetching block for peer"; + "block_root" => ?root, + "error" => ?e + ); + break; + } + } } - } else { - error!(self.log, "Block in the chain is not in the store"; - "request_root" => ?root); - } - } - let current_slot = self - .chain - .slot() - .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); - if blocks_sent < (req.count as usize) { - debug!(self.log, "BlocksByRange Response processed"; - "peer" => %peer_id, - "msg" => "Failed to return all requested blocks", - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blocks_sent); - } else { - debug!(self.log, "BlocksByRange Response processed"; - "peer" => %peer_id, - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blocks_sent); - } + if blocks_sent < (req.count as usize) { + debug!( + self.log, + "BlocksByRange Response processed"; + "peer" => %peer_id, + "msg" => "Failed to return all requested blocks", + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blocks_sent + ); + } else { + debug!( + self.log, + "BlocksByRange Response processed"; + "peer" => %peer_id, + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blocks_sent + ); + } - // send the stream terminator - self.send_network_message(NetworkMessage::SendResponse { - peer_id, - response: Response::BlocksByRange(None), - id: request_id, - }); + // send the stream terminator + self.send_network_message(NetworkMessage::SendResponse { + peer_id, + response: Response::BlocksByRange(None), + id: request_id, + }); + drop(send_on_drop); + }, + "load_blocks_by_range_blocks", + ); } } diff --git a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs index 082808f88..943ee9cda 100644 --- a/beacon_node/network/src/beacon_processor/worker/sync_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/sync_methods.rs @@ -138,7 +138,7 @@ impl Worker { let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64()); let sent_blocks = downloaded_blocks.len(); - match self.process_backfill_blocks(&downloaded_blocks) { + match self.process_backfill_blocks(downloaded_blocks) { (_, Ok(_)) => { debug!(self.log, "Backfill batch processed"; "batch_epoch" => epoch, @@ -223,9 +223,10 @@ impl Worker { /// Helper function to process backfill block batches which only consumes the chain and blocks to process. fn process_backfill_blocks( &self, - blocks: &[SignedBeaconBlock], + blocks: Vec>, ) -> (usize, Result<(), ChainSegmentFailed>) { - match self.chain.import_historical_block_batch(blocks) { + let blinded_blocks = blocks.into_iter().map(Into::into).collect(); + match self.chain.import_historical_block_batch(blinded_blocks) { Ok(imported_blocks) => { metrics::inc_counter( &metrics::BEACON_PROCESSOR_BACKFILL_CHAIN_SEGMENT_SUCCESS_TOTAL, diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 1147d52c4..30ee66074 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -41,6 +41,10 @@ pub enum Error { computed: Hash256, }, BlockReplayError(BlockReplayError), + AddPayloadLogicError, + ResyncRequiredForExecutionPayloadSeparation, + SlotClockUnavailableForMigration, + V9MigrationFailure(Hash256), } pub trait HandleUnavailable { diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 2c31f7cf2..fe66a176b 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -18,8 +18,8 @@ use crate::metadata::{ }; use crate::metrics; use crate::{ - get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem, - StoreOp, + get_key_for_col, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp, + PartialBeaconState, StoreItem, StoreOp, }; use leveldb::iterator::LevelDBIterator; use lru::LruCache; @@ -89,6 +89,8 @@ pub enum HotColdDBError { MissingHotStateSummary(Hash256), MissingEpochBoundaryState(Hash256), MissingSplitState(Hash256, Slot), + MissingExecutionPayload(Hash256), + MissingFullBlockExecutionPayloadPruned(Hash256, Slot), MissingAnchorInfo, HotStateSummaryError(BeaconStateError), RestorePointDecodeError(ssz::DecodeError), @@ -185,6 +187,21 @@ impl HotColdDB, LevelDB> { } } + // Load the previous split slot from the database (if any). This ensures we can + // stop and restart correctly. This needs to occur *before* running any migrations + // because some migrations load states and depend on the split. + if let Some(split) = db.load_split()? { + *db.split.write() = split; + *db.anchor_info.write() = db.load_anchor_info()?; + + info!( + db.log, + "Hot-Cold DB initialized"; + "split_slot" => split.slot, + "split_state" => ?split.state_root + ); + } + // Ensure that the schema version of the on-disk database matches the software. // If the version is mismatched, an automatic migration will be attempted. let db = Arc::new(db); @@ -206,20 +223,6 @@ impl HotColdDB, LevelDB> { } db.store_config()?; - // Load the previous split slot from the database (if any). This ensures we can - // stop and restart correctly. - if let Some(split) = db.load_split()? { - *db.split.write() = split; - *db.anchor_info.write() = db.load_anchor_info()?; - - info!( - db.log, - "Hot-Cold DB initialized"; - "split_slot" => split.slot, - "split_state" => ?split.state_root - ); - } - // Run a garbage collection pass. db.remove_garbage()?; @@ -263,53 +266,150 @@ impl, Cold: ItemStore> HotColdDB block: SignedBeaconBlock, ) -> Result<(), Error> { // Store on disk. - let op = self.block_as_kv_store_op(block_root, &block); - self.hot_db.do_atomically(vec![op])?; - + let mut ops = Vec::with_capacity(2); + let block = self.block_as_kv_store_ops(block_root, block, &mut ops)?; + self.hot_db.do_atomically(ops)?; // Update cache. self.block_cache.lock().put(*block_root, block); - Ok(()) } /// Prepare a signed beacon block for storage in the database. - pub fn block_as_kv_store_op( + /// + /// Return the original block for re-use after storage. It's passed by value so it can be + /// cracked open and have its payload extracted. + pub fn block_as_kv_store_ops( &self, key: &Hash256, - block: &SignedBeaconBlock, - ) -> KeyValueStoreOp { - // FIXME(altair): re-add block write/overhead metrics, or remove them - let db_key = get_key_for_col(DBColumn::BeaconBlock.into(), key.as_bytes()); - KeyValueStoreOp::PutKeyValue(db_key, block.as_ssz_bytes()) + block: SignedBeaconBlock, + ops: &mut Vec, + ) -> Result, Error> { + // Split block into blinded block and execution payload. + let (blinded_block, payload) = block.into(); + + // Store blinded block. + self.blinded_block_as_kv_store_ops(key, &blinded_block, ops); + + // Store execution payload if present. + if let Some(ref execution_payload) = payload { + ops.push(execution_payload.as_kv_store_op(*key)); + } + + // Re-construct block. This should always succeed. + blinded_block + .try_into_full_block(payload) + .ok_or(Error::AddPayloadLogicError) } - /// Fetch a block from the store. - pub fn get_block(&self, block_root: &Hash256) -> Result>, Error> { + /// Prepare a signed beacon block for storage in the datbase *without* its payload. + pub fn blinded_block_as_kv_store_ops( + &self, + key: &Hash256, + blinded_block: &SignedBeaconBlock>, + ops: &mut Vec, + ) { + let db_key = get_key_for_col(DBColumn::BeaconBlock.into(), key.as_bytes()); + ops.push(KeyValueStoreOp::PutKeyValue( + db_key, + blinded_block.as_ssz_bytes(), + )); + } + + pub fn try_get_full_block( + &self, + block_root: &Hash256, + ) -> Result>, Error> { metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT); // Check the cache. if let Some(block) = self.block_cache.lock().get(block_root) { metrics::inc_counter(&metrics::BEACON_BLOCK_CACHE_HIT_COUNT); - return Ok(Some(block.clone())); + return Ok(Some(DatabaseBlock::Full(block.clone()))); } - let block = self.get_block_with(block_root, |bytes| { - SignedBeaconBlock::from_ssz_bytes(bytes, &self.spec) - })?; + // Load the blinded block. + let blinded_block = match self.get_blinded_block(block_root)? { + Some(block) => block, + None => return Ok(None), + }; - // Add to cache. - if let Some(ref block) = block { - self.block_cache.lock().put(*block_root, block.clone()); - } + // If the block is after the split point then we should have the full execution payload + // stored in the database. Otherwise, just return the blinded block. + // Hold the split lock so that it can't change. + let split = self.split.read_recursive(); - Ok(block) + let block = if blinded_block.message().execution_payload().is_err() + || blinded_block.slot() >= split.slot + { + // Re-constructing the full block should always succeed here. + let full_block = self.make_full_block(block_root, blinded_block)?; + + // Add to cache. + self.block_cache.lock().put(*block_root, full_block.clone()); + + DatabaseBlock::Full(full_block) + } else { + DatabaseBlock::Blinded(blinded_block) + }; + drop(split); + + Ok(Some(block)) } - /// Fetch a block from the store, ignoring which fork variant it *should* be for. - pub fn get_block_any_variant( + /// Fetch a full block with execution payload from the store. + pub fn get_full_block( &self, block_root: &Hash256, ) -> Result>, Error> { + match self.try_get_full_block(block_root)? { + Some(DatabaseBlock::Full(block)) => Ok(Some(block)), + Some(DatabaseBlock::Blinded(block)) => Err( + HotColdDBError::MissingFullBlockExecutionPayloadPruned(*block_root, block.slot()) + .into(), + ), + None => Ok(None), + } + } + + /// Get a schema V8 or earlier full block by reading it and its payload from disk. + pub fn get_full_block_prior_to_v9( + &self, + block_root: &Hash256, + ) -> Result>, Error> { + self.get_block_with(block_root, |bytes| { + SignedBeaconBlock::from_ssz_bytes(bytes, &self.spec) + }) + } + + /// Convert a blinded block into a full block by loading its execution payload if necessary. + pub fn make_full_block( + &self, + block_root: &Hash256, + blinded_block: SignedBeaconBlock>, + ) -> Result, Error> { + if blinded_block.message().execution_payload().is_ok() { + let execution_payload = self.get_execution_payload(block_root)?; + blinded_block.try_into_full_block(Some(execution_payload)) + } else { + blinded_block.try_into_full_block(None) + } + .ok_or(Error::AddPayloadLogicError) + } + + pub fn get_blinded_block( + &self, + block_root: &Hash256, + ) -> Result>>, Error> { + self.get_block_with(block_root, |bytes| { + SignedBeaconBlock::from_ssz_bytes(bytes, &self.spec) + }) + } + + /// Fetch a block from the store, ignoring which fork variant it *should* be for. + pub fn get_block_any_variant>( + &self, + block_root: &Hash256, + ) -> Result>, Error> { self.get_block_with(block_root, SignedBeaconBlock::any_from_ssz_bytes) } @@ -317,11 +417,11 @@ impl, Cold: ItemStore> HotColdDB /// /// This is useful for e.g. ignoring the slot-indicated fork to forcefully load a block as if it /// were for a different fork. - pub fn get_block_with( + pub fn get_block_with>( &self, block_root: &Hash256, - decoder: impl FnOnce(&[u8]) -> Result, ssz::DecodeError>, - ) -> Result>, Error> { + decoder: impl FnOnce(&[u8]) -> Result, ssz::DecodeError>, + ) -> Result>, Error> { self.hot_db .get_bytes(DBColumn::BeaconBlock.into(), block_root.as_bytes())? .map(|block_bytes| decoder(&block_bytes)) @@ -329,6 +429,15 @@ impl, Cold: ItemStore> HotColdDB .map_err(|e| e.into()) } + /// Load the execution payload for a block from disk. + pub fn get_execution_payload( + &self, + block_root: &Hash256, + ) -> Result, Error> { + self.get_item(block_root)? + .ok_or_else(|| HotColdDBError::MissingExecutionPayload(*block_root).into()) + } + /// Determine whether a block exists in the database. pub fn block_exists(&self, block_root: &Hash256) -> Result { self.hot_db @@ -339,7 +448,9 @@ impl, Cold: ItemStore> HotColdDB pub fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> { self.block_cache.lock().pop(block_root); self.hot_db - .key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes()) + .key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes())?; + self.hot_db + .key_delete(DBColumn::ExecPayload.into(), block_root.as_bytes()) } pub fn put_state_summary( @@ -550,24 +661,27 @@ impl, Cold: ItemStore> HotColdDB } /// Convert a batch of `StoreOp` to a batch of `KeyValueStoreOp`. - pub fn convert_to_kv_batch(&self, batch: &[StoreOp]) -> Result, Error> { + pub fn convert_to_kv_batch( + &self, + batch: Vec>, + ) -> Result, Error> { let mut key_value_batch = Vec::with_capacity(batch.len()); for op in batch { match op { StoreOp::PutBlock(block_root, block) => { - key_value_batch.push(self.block_as_kv_store_op(block_root, block)); + self.block_as_kv_store_ops(&block_root, *block, &mut key_value_batch)?; } StoreOp::PutState(state_root, state) => { - self.store_hot_state(state_root, state, &mut key_value_batch)?; + self.store_hot_state(&state_root, state, &mut key_value_batch)?; } StoreOp::PutStateSummary(state_root, summary) => { - key_value_batch.push(summary.as_kv_store_op(*state_root)); + key_value_batch.push(summary.as_kv_store_op(state_root)); } StoreOp::PutStateTemporaryFlag(state_root) => { - key_value_batch.push(TemporaryFlag.as_kv_store_op(*state_root)); + key_value_batch.push(TemporaryFlag.as_kv_store_op(state_root)); } StoreOp::DeleteStateTemporaryFlag(state_root) => { @@ -592,17 +706,21 @@ impl, Cold: ItemStore> HotColdDB key_value_batch.push(KeyValueStoreOp::DeleteKey(state_key)); } } + + StoreOp::DeleteExecutionPayload(block_root) => { + let key = get_key_for_col(DBColumn::ExecPayload.into(), block_root.as_bytes()); + key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); + } } } Ok(key_value_batch) } pub fn do_atomically(&self, batch: Vec>) -> Result<(), Error> { + // Update the block cache whilst holding a lock, to ensure that the cache updates atomically + // with the database. let mut guard = self.block_cache.lock(); - self.hot_db - .do_atomically(self.convert_to_kv_batch(&batch)?)?; - for op in &batch { match op { StoreOp::PutBlock(block_root, block) => { @@ -622,8 +740,15 @@ impl, Cold: ItemStore> HotColdDB } StoreOp::DeleteState(_, _) => (), + + StoreOp::DeleteExecutionPayload(_) => (), } } + + self.hot_db + .do_atomically(self.convert_to_kv_batch(batch)?)?; + drop(guard); + Ok(()) } @@ -887,34 +1012,33 @@ impl, Cold: ItemStore> HotColdDB start_slot: Slot, end_slot: Slot, end_block_hash: Hash256, - ) -> Result>, Error> { - let mut blocks: Vec> = - ParentRootBlockIterator::new(self, end_block_hash) - .map(|result| result.map(|(_, block)| block)) - // Include the block at the end slot (if any), it needs to be - // replayed in order to construct the canonical state at `end_slot`. - .filter(|result| { - result - .as_ref() - .map_or(true, |block| block.slot() <= end_slot) - }) - // Include the block at the start slot (if any). Whilst it doesn't need to be - // applied to the state, it contains a potentially useful state root. - // - // Return `true` on an `Err` so that the `collect` fails, unless the error is a - // `BlockNotFound` error and some blocks are intentionally missing from the DB. - // This complexity is unfortunately necessary to avoid loading the parent of the - // oldest known block -- we can't know that we have all the required blocks until we - // load a block with slot less than the start slot, which is impossible if there are - // no blocks with slot less than the start slot. - .take_while(|result| match result { - Ok(block) => block.slot() >= start_slot, - Err(Error::BlockNotFound(_)) => { - self.get_oldest_block_slot() == self.spec.genesis_slot - } - Err(_) => true, - }) - .collect::>()?; + ) -> Result>>, Error> { + let mut blocks = ParentRootBlockIterator::new(self, end_block_hash) + .map(|result| result.map(|(_, block)| block)) + // Include the block at the end slot (if any), it needs to be + // replayed in order to construct the canonical state at `end_slot`. + .filter(|result| { + result + .as_ref() + .map_or(true, |block| block.slot() <= end_slot) + }) + // Include the block at the start slot (if any). Whilst it doesn't need to be + // applied to the state, it contains a potentially useful state root. + // + // Return `true` on an `Err` so that the `collect` fails, unless the error is a + // `BlockNotFound` error and some blocks are intentionally missing from the DB. + // This complexity is unfortunately necessary to avoid loading the parent of the + // oldest known block -- we can't know that we have all the required blocks until we + // load a block with slot less than the start slot, which is impossible if there are + // no blocks with slot less than the start slot. + .take_while(|result| match result { + Ok(block) => block.slot() >= start_slot, + Err(Error::BlockNotFound(_)) => { + self.get_oldest_block_slot() == self.spec.genesis_slot + } + Err(_) => true, + }) + .collect::, _>>()?; blocks.reverse(); Ok(blocks) } @@ -926,7 +1050,7 @@ impl, Cold: ItemStore> HotColdDB fn replay_blocks( &self, state: BeaconState, - blocks: Vec>, + blocks: Vec>>, target_slot: Slot, state_root_iter: Option>>, state_root_strategy: StateRootStrategy, @@ -956,6 +1080,11 @@ impl, Cold: ItemStore> HotColdDB }) } + /// Get a reference to the `ChainSpec` used by the database. + pub fn get_chain_spec(&self) -> &ChainSpec { + &self.spec + } + /// Fetch a copy of the current split slot from memory. pub fn get_split_slot(&self) -> Slot { self.split.read_recursive().slot diff --git a/beacon_node/store/src/impls.rs b/beacon_node/store/src/impls.rs index 1b442cbc5..736585a72 100644 --- a/beacon_node/store/src/impls.rs +++ b/beacon_node/store/src/impls.rs @@ -1 +1,2 @@ pub mod beacon_state; +pub mod execution_payload; diff --git a/beacon_node/store/src/impls/execution_payload.rs b/beacon_node/store/src/impls/execution_payload.rs new file mode 100644 index 000000000..ddb9a4462 --- /dev/null +++ b/beacon_node/store/src/impls/execution_payload.rs @@ -0,0 +1,17 @@ +use crate::{DBColumn, Error, StoreItem}; +use ssz::{Decode, Encode}; +use types::{EthSpec, ExecutionPayload}; + +impl StoreItem for ExecutionPayload { + fn db_column() -> DBColumn { + DBColumn::ExecPayload + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(Self::from_ssz_bytes(bytes)?) + } +} diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index d5448de98..910979247 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -3,7 +3,8 @@ use crate::{Error, HotColdDB, ItemStore}; use std::borrow::Cow; use std::marker::PhantomData; use types::{ - typenum::Unsigned, BeaconState, BeaconStateError, EthSpec, Hash256, SignedBeaconBlock, Slot, + typenum::Unsigned, BeaconState, BeaconStateError, BlindedPayload, EthSpec, Hash256, + SignedBeaconBlock, Slot, }; /// Implemented for types that have ancestors (e.g., blocks, states) that may be iterated over. @@ -188,7 +189,7 @@ impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> RootsIterator<'a, T, block_hash: Hash256, ) -> Result { let block = store - .get_block(&block_hash)? + .get_blinded_block(&block_hash)? .ok_or_else(|| BeaconStateError::MissingBeaconBlock(block_hash.into()))?; let state = store .get_state(&block.state_root(), Some(block.slot()))? @@ -272,7 +273,10 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> } } - fn do_next(&mut self) -> Result)>, Error> { + #[allow(clippy::type_complexity)] + fn do_next( + &mut self, + ) -> Result>)>, Error> { // Stop once we reach the zero parent, otherwise we'll keep returning the genesis // block forever. if self.next_block_root.is_zero() { @@ -282,7 +286,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> let block = if self.decode_any_variant { self.store.get_block_any_variant(&block_root) } else { - self.store.get_block(&block_root) + self.store.get_blinded_block(&block_root) }? .ok_or(Error::BlockNotFound(block_root))?; self.next_block_root = block.message().parent_root(); @@ -294,7 +298,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> Iterator for ParentRootBlockIterator<'a, E, Hot, Cold> { - type Item = Result<(Hash256, SignedBeaconBlock), Error>; + type Item = Result<(Hash256, SignedBeaconBlock>), Error>; fn next(&mut self) -> Option { self.do_next().transpose() @@ -322,10 +326,10 @@ impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockIterator<'a, T, } } - fn do_next(&mut self) -> Result>, Error> { + fn do_next(&mut self) -> Result>>, Error> { if let Some(result) = self.roots.next() { let (root, _slot) = result?; - self.roots.inner.store.get_block(&root) + self.roots.inner.store.get_blinded_block(&root) } else { Ok(None) } @@ -335,7 +339,7 @@ impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockIterator<'a, T, impl<'a, T: EthSpec, Hot: ItemStore, Cold: ItemStore> Iterator for BlockIterator<'a, T, Hot, Cold> { - type Item = Result, Error>; + type Item = Result>, Error>; fn next(&mut self) -> Option { self.do_next().transpose() diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 4a47353cb..86bd4ffac 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -197,6 +197,28 @@ impl KeyValueStore for LevelDB { }), ) } + + /// Iterate through all keys and values in a particular column. + fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { + let start_key = + BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes())); + + let iter = self.db.keys_iter(self.read_options()); + iter.seek(&start_key); + + Box::new( + iter.take_while(move |key| key.matches_column(column)) + .map(move |bytes_key| { + let key = + bytes_key + .remove_column(column) + .ok_or(HotColdDBError::IterationError { + unexpected_key: bytes_key, + })?; + Ok(key) + }), + ) + } } impl ItemStore for LevelDB {} diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index bc8f62dd2..613c2e416 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -43,6 +43,7 @@ use strum::{EnumString, IntoStaticStr}; pub use types::*; pub type ColumnIter<'a> = Box), Error>> + 'a>; +pub type ColumnKeyIter<'a> = Box> + 'a>; pub trait KeyValueStore: Sync + Send + Sized + 'static { /// Retrieve some bytes in `column` with `key`. @@ -77,11 +78,17 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { /// Compact the database, freeing space used by deleted items. fn compact(&self) -> Result<(), Error>; - /// Iterate through all values in a particular column. + /// Iterate through all keys and values in a particular column. fn iter_column(&self, _column: DBColumn) -> ColumnIter { // Default impl for non LevelDB databases Box::new(std::iter::empty()) } + + /// Iterate through all keys in a particular column. + fn iter_column_keys(&self, _column: DBColumn) -> ColumnKeyIter { + // Default impl for non LevelDB databases + Box::new(std::iter::empty()) + } } pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { @@ -152,6 +159,7 @@ pub enum StoreOp<'a, E: EthSpec> { DeleteStateTemporaryFlag(Hash256), DeleteBlock(Hash256), DeleteState(Hash256, Option), + DeleteExecutionPayload(Hash256), } /// A unique column identifier. @@ -172,6 +180,9 @@ pub enum DBColumn { /// and then made non-temporary by the deletion of their state root from this column. #[strum(serialize = "bst")] BeaconStateTemporary, + /// Execution payloads for blocks more recent than the finalized checkpoint. + #[strum(serialize = "exp")] + ExecPayload, /// For persisting in-memory state to the database. #[strum(serialize = "bch")] BeaconChain, @@ -198,6 +209,12 @@ pub enum DBColumn { DhtEnrs, } +/// A block from the database, which might have an execution payload or not. +pub enum DatabaseBlock { + Full(SignedBeaconBlock), + Blinded(SignedBeaconBlock>), +} + impl DBColumn { pub fn as_str(self) -> &'static str { self.into() diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 78c02a02e..5551f1f44 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::{Checkpoint, Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(8); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(9); // All the keys that get stored under the `BeaconMeta` column. // diff --git a/beacon_node/store/src/reconstruct.rs b/beacon_node/store/src/reconstruct.rs index 6b808974e..7db2652f2 100644 --- a/beacon_node/store/src/reconstruct.rs +++ b/beacon_node/store/src/reconstruct.rs @@ -76,7 +76,7 @@ where None } else { Some( - self.get_block(&block_root)? + self.get_blinded_block(&block_root)? .ok_or(Error::BlockNotFound(block_root))?, ) }; diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index c876a69be..49510e732 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -563,10 +563,10 @@ where /// The supplied block **must** pass the `state_transition` function as it will not be run /// here. #[allow(clippy::too_many_arguments)] - pub fn on_block( + pub fn on_block>( &mut self, current_slot: Slot, - block: &BeaconBlock, + block: &BeaconBlock, block_root: Hash256, block_delay: Duration, state: &BeaconState, diff --git a/consensus/fork_choice/src/fork_choice_store.rs b/consensus/fork_choice/src/fork_choice_store.rs index 9b85708f3..782600751 100644 --- a/consensus/fork_choice/src/fork_choice_store.rs +++ b/consensus/fork_choice/src/fork_choice_store.rs @@ -1,4 +1,4 @@ -use types::{BeaconBlock, BeaconState, Checkpoint, EthSpec, Hash256, Slot}; +use types::{BeaconBlock, BeaconState, Checkpoint, EthSpec, ExecPayload, Hash256, Slot}; /// Approximates the `Store` in "Ethereum 2.0 Phase 0 -- Beacon Chain Fork Choice": /// @@ -31,9 +31,9 @@ pub trait ForkChoiceStore: Sized { /// Called whenever `ForkChoice::on_block` has verified a block, but not yet added it to fork /// choice. Allows the implementer to performing caching or other housekeeping duties. - fn on_verified_block( + fn on_verified_block>( &mut self, - block: &BeaconBlock, + block: &BeaconBlock, block_root: Hash256, state: &BeaconState, ) -> Result<(), Self::Error>; diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 160800ca5..3f8a2ac6b 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -344,7 +344,7 @@ impl ForkChoiceTest { let state_root = harness .chain .store - .get_block(&fc.fc_store().justified_checkpoint().root) + .get_blinded_block(&fc.fc_store().justified_checkpoint().root) .unwrap() .unwrap() .message() diff --git a/consensus/state_processing/src/block_replayer.rs b/consensus/state_processing/src/block_replayer.rs index 937348263..d4b4b067e 100644 --- a/consensus/state_processing/src/block_replayer.rs +++ b/consensus/state_processing/src/block_replayer.rs @@ -3,10 +3,12 @@ use crate::{ BlockProcessingError, BlockSignatureStrategy, SlotProcessingError, VerifyBlockRoot, }; use std::marker::PhantomData; -use types::{BeaconState, ChainSpec, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{BeaconState, BlindedPayload, ChainSpec, EthSpec, Hash256, SignedBeaconBlock, Slot}; -type PreBlockHook<'a, E, Error> = - Box, &SignedBeaconBlock) -> Result<(), Error> + 'a>; +type PreBlockHook<'a, E, Error> = Box< + dyn FnMut(&mut BeaconState, &SignedBeaconBlock>) -> Result<(), Error> + + 'a, +>; type PostBlockHook<'a, E, Error> = PreBlockHook<'a, E, Error>; type PreSlotHook<'a, E, Error> = Box) -> Result<(), Error> + 'a>; type PostSlotHook<'a, E, Error> = Box< @@ -175,7 +177,7 @@ where fn get_state_root( &mut self, slot: Slot, - blocks: &[SignedBeaconBlock], + blocks: &[SignedBeaconBlock>], i: usize, ) -> Result, Error> { // If we don't care about state roots then return immediately. @@ -214,7 +216,7 @@ where /// after the blocks have been applied. pub fn apply_blocks( mut self, - blocks: Vec>, + blocks: Vec>>, target_slot: Option, ) -> Result { for (i, block) in blocks.iter().enumerate() { diff --git a/consensus/types/Cargo.toml b/consensus/types/Cargo.toml index 6350d0775..881d17a33 100644 --- a/consensus/types/Cargo.toml +++ b/consensus/types/Cargo.toml @@ -43,7 +43,7 @@ regex = "1.5.5" lazy_static = "1.4.0" parking_lot = "0.12.0" itertools = "0.10.0" -superstruct = "0.4.1" +superstruct = "0.5.0" serde_json = "1.0.74" smallvec = "1.8.0" diff --git a/consensus/types/src/beacon_block.rs b/consensus/types/src/beacon_block.rs index 7f83d46dd..6eb12ddf0 100644 --- a/consensus/types/src/beacon_block.rs +++ b/consensus/types/src/beacon_block.rs @@ -37,7 +37,9 @@ use tree_hash_derive::TreeHash; ref_attributes( derive(Debug, PartialEq, TreeHash), tree_hash(enum_behaviour = "transparent") - ) + ), + map_ref_into(BeaconBlockBodyRef), + map_ref_mut_into(BeaconBlockBodyRefMut) )] #[derive(Debug, Clone, Serialize, Deserialize, Encode, TreeHash, Derivative)] #[derivative(PartialEq, Hash(bound = "T: EthSpec"))] @@ -199,20 +201,17 @@ impl<'a, T: EthSpec, Payload: ExecPayload> BeaconBlockRef<'a, T, Payload> { /// Convenience accessor for the `body` as a `BeaconBlockBodyRef`. pub fn body(&self) -> BeaconBlockBodyRef<'a, T, Payload> { - match self { - BeaconBlockRef::Base(block) => BeaconBlockBodyRef::Base(&block.body), - BeaconBlockRef::Altair(block) => BeaconBlockBodyRef::Altair(&block.body), - BeaconBlockRef::Merge(block) => BeaconBlockBodyRef::Merge(&block.body), - } + map_beacon_block_ref_into_beacon_block_body_ref!(&'a _, *self, |block, cons| cons( + &block.body + )) } /// Return the tree hash root of the block's body. pub fn body_root(&self) -> Hash256 { - match self { - BeaconBlockRef::Base(block) => block.body.tree_hash_root(), - BeaconBlockRef::Altair(block) => block.body.tree_hash_root(), - BeaconBlockRef::Merge(block) => block.body.tree_hash_root(), - } + map_beacon_block_ref!(&'a _, *self, |block, cons| { + let _: Self = cons(block); + block.body.tree_hash_root() + }) } /// Returns the epoch corresponding to `self.slot()`. @@ -249,11 +248,9 @@ impl<'a, T: EthSpec, Payload: ExecPayload> BeaconBlockRef<'a, T, Payload> { impl<'a, T: EthSpec, Payload: ExecPayload> BeaconBlockRefMut<'a, T, Payload> { /// Convert a mutable reference to a beacon block to a mutable ref to its body. pub fn body_mut(self) -> BeaconBlockBodyRefMut<'a, T, Payload> { - match self { - BeaconBlockRefMut::Base(block) => BeaconBlockBodyRefMut::Base(&mut block.body), - BeaconBlockRefMut::Altair(block) => BeaconBlockBodyRefMut::Altair(&mut block.body), - BeaconBlockRefMut::Merge(block) => BeaconBlockBodyRefMut::Merge(&mut block.body), - } + map_beacon_block_ref_mut_into_beacon_block_body_ref_mut!(&'a _, self, |block, cons| cons( + &mut block.body + )) } } @@ -465,6 +462,99 @@ impl> BeaconBlockMerge { } } +// We can convert pre-Bellatrix blocks without payloads into blocks "with" payloads. +impl From>> + for BeaconBlockBase> +{ + fn from(block: BeaconBlockBase>) -> Self { + let BeaconBlockBase { + slot, + proposer_index, + parent_root, + state_root, + body, + } = block; + + BeaconBlockBase { + slot, + proposer_index, + parent_root, + state_root, + body: body.into(), + } + } +} + +impl From>> + for BeaconBlockAltair> +{ + fn from(block: BeaconBlockAltair>) -> Self { + let BeaconBlockAltair { + slot, + proposer_index, + parent_root, + state_root, + body, + } = block; + + BeaconBlockAltair { + slot, + proposer_index, + parent_root, + state_root, + body: body.into(), + } + } +} + +// We can convert blocks with payloads to blocks without payloads, and an optional payload. +macro_rules! impl_from { + ($ty_name:ident, <$($from_params:ty),*>, <$($to_params:ty),*>, $body_expr:expr) => { + impl From<$ty_name<$($from_params),*>> + for ($ty_name<$($to_params),*>, Option>) + { + #[allow(clippy::redundant_closure_call)] + fn from(block: $ty_name<$($from_params),*>) -> Self { + let $ty_name { + slot, + proposer_index, + parent_root, + state_root, + body, + } = block; + + let (body, payload) = ($body_expr)(body); + + ($ty_name { + slot, + proposer_index, + parent_root, + state_root, + body, + }, payload) + } + } + } +} + +impl_from!(BeaconBlockBase, >, >, |body: BeaconBlockBodyBase<_, _>| body.into()); +impl_from!(BeaconBlockAltair, >, >, |body: BeaconBlockBodyAltair<_, _>| body.into()); +impl_from!(BeaconBlockMerge, >, >, |body: BeaconBlockBodyMerge<_, _>| body.into()); + +impl From>> + for ( + BeaconBlock>, + Option>, + ) +{ + fn from(block: BeaconBlock>) -> Self { + map_beacon_block!(block, |inner, cons| { + let (block, payload) = inner.into(); + (cons(block), payload) + }) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/consensus/types/src/beacon_block_body.rs b/consensus/types/src/beacon_block_body.rs index c1db64ae0..34761ea9a 100644 --- a/consensus/types/src/beacon_block_body.rs +++ b/consensus/types/src/beacon_block_body.rs @@ -73,6 +73,198 @@ impl<'a, T: EthSpec> BeaconBlockBodyRef<'a, T> { } } +// We can convert pre-Bellatrix block bodies without payloads into block bodies "with" payloads. +impl From>> + for BeaconBlockBodyBase> +{ + fn from(body: BeaconBlockBodyBase>) -> Self { + let BeaconBlockBodyBase { + randao_reveal, + eth1_data, + graffiti, + proposer_slashings, + attester_slashings, + attestations, + deposits, + voluntary_exits, + _phantom, + } = body; + + BeaconBlockBodyBase { + randao_reveal, + eth1_data, + graffiti, + proposer_slashings, + attester_slashings, + attestations, + deposits, + voluntary_exits, + _phantom: PhantomData, + } + } +} + +impl From>> + for BeaconBlockBodyAltair> +{ + fn from(body: BeaconBlockBodyAltair>) -> Self { + let BeaconBlockBodyAltair { + randao_reveal, + eth1_data, + graffiti, + proposer_slashings, + attester_slashings, + attestations, + deposits, + voluntary_exits, + sync_aggregate, + _phantom, + } = body; + + BeaconBlockBodyAltair { + randao_reveal, + eth1_data, + graffiti, + proposer_slashings, + attester_slashings, + attestations, + deposits, + voluntary_exits, + sync_aggregate, + _phantom: PhantomData, + } + } +} + +// Likewise bodies with payloads can be transformed into bodies without. +impl From>> + for ( + BeaconBlockBodyBase>, + Option>, + ) +{ + fn from(body: BeaconBlockBodyBase>) -> Self { + let BeaconBlockBodyBase { + randao_reveal, + eth1_data, + graffiti, + proposer_slashings, + attester_slashings, + attestations, + deposits, + voluntary_exits, + _phantom, + } = body; + + ( + BeaconBlockBodyBase { + randao_reveal, + eth1_data, + graffiti, + proposer_slashings, + attester_slashings, + attestations, + deposits, + voluntary_exits, + _phantom: PhantomData, + }, + None, + ) + } +} + +impl From>> + for ( + BeaconBlockBodyAltair>, + Option>, + ) +{ + fn from(body: BeaconBlockBodyAltair>) -> Self { + let BeaconBlockBodyAltair { + randao_reveal, + eth1_data, + graffiti, + proposer_slashings, + attester_slashings, + attestations, + deposits, + voluntary_exits, + sync_aggregate, + _phantom, + } = body; + + ( + BeaconBlockBodyAltair { + randao_reveal, + eth1_data, + graffiti, + proposer_slashings, + attester_slashings, + attestations, + deposits, + voluntary_exits, + sync_aggregate, + _phantom: PhantomData, + }, + None, + ) + } +} + +impl From>> + for ( + BeaconBlockBodyMerge>, + Option>, + ) +{ + fn from(body: BeaconBlockBodyMerge>) -> Self { + let BeaconBlockBodyMerge { + randao_reveal, + eth1_data, + graffiti, + proposer_slashings, + attester_slashings, + attestations, + deposits, + voluntary_exits, + sync_aggregate, + execution_payload: FullPayload { execution_payload }, + } = body; + + ( + BeaconBlockBodyMerge { + randao_reveal, + eth1_data, + graffiti, + proposer_slashings, + attester_slashings, + attestations, + deposits, + voluntary_exits, + sync_aggregate, + execution_payload: BlindedPayload { + execution_payload_header: From::from(&execution_payload), + }, + }, + Some(execution_payload), + ) + } +} + +impl From>> + for ( + BeaconBlockBody>, + Option>, + ) +{ + fn from(body: BeaconBlockBody>) -> Self { + map_beacon_block_body!(body, |inner, cons| { + let (block, payload) = inner.into(); + (cons(block), payload) + }) + } +} + #[cfg(test)] mod tests { mod base { diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 908419cdd..22e429a58 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -139,7 +139,7 @@ pub use crate::shuffling_id::AttestationShufflingId; pub use crate::signed_aggregate_and_proof::SignedAggregateAndProof; pub use crate::signed_beacon_block::{ SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockHash, - SignedBeaconBlockMerge, + SignedBeaconBlockMerge, SignedBlindedBeaconBlock, }; pub use crate::signed_beacon_block_header::SignedBeaconBlockHeader; pub use crate::signed_contribution_and_proof::SignedContributionAndProof; diff --git a/consensus/types/src/payload.rs b/consensus/types/src/payload.rs index 5be603c19..d736f0be1 100644 --- a/consensus/types/src/payload.rs +++ b/consensus/types/src/payload.rs @@ -15,7 +15,9 @@ pub enum BlockType { } pub trait ExecPayload: - Encode + Debug + + Clone + + Encode + Decode + TestRandom + TreeHash @@ -37,6 +39,7 @@ pub trait ExecPayload: // More fields can be added here if you wish. fn parent_hash(&self) -> ExecutionBlockHash; fn prev_randao(&self) -> Hash256; + fn block_number(&self) -> u64; fn timestamp(&self) -> u64; fn block_hash(&self) -> ExecutionBlockHash; } @@ -58,6 +61,10 @@ impl ExecPayload for FullPayload { self.execution_payload.prev_randao } + fn block_number(&self) -> u64 { + self.execution_payload.block_number + } + fn timestamp(&self) -> u64 { self.execution_payload.timestamp } @@ -84,6 +91,10 @@ impl ExecPayload for BlindedPayload { self.execution_payload_header.prev_randao } + fn block_number(&self) -> u64 { + self.execution_payload_header.block_number + } + fn timestamp(&self) -> u64 { self.execution_payload_header.timestamp } @@ -93,13 +104,28 @@ impl ExecPayload for BlindedPayload { } } -#[derive(Default, Debug, Clone, TestRandom, Serialize, Deserialize, Derivative)] +#[derive(Debug, Clone, TestRandom, Serialize, Deserialize, Derivative)] #[derivative(PartialEq, Hash(bound = "T: EthSpec"))] #[serde(bound = "T: EthSpec")] pub struct BlindedPayload { pub execution_payload_header: ExecutionPayloadHeader, } +// NOTE: the `Default` implementation for `BlindedPayload` needs to be different from the `Default` +// implementation for `ExecutionPayloadHeader` because payloads are checked for equality against the +// default payload in `is_merge_transition_block` to determine whether the merge has occurred. +// +// The default `BlindedPayload` is therefore the payload header that results from blinding the +// default `ExecutionPayload`, which differs from the default `ExecutionPayloadHeader` in that +// its `transactions_root` is the hash of the empty list rather than 0x0. +impl Default for BlindedPayload { + fn default() -> Self { + Self { + execution_payload_header: ExecutionPayloadHeader::from(&ExecutionPayload::default()), + } + } +} + impl From> for BlindedPayload { fn from(execution_payload_header: ExecutionPayloadHeader) -> Self { Self { diff --git a/consensus/types/src/signed_beacon_block.rs b/consensus/types/src/signed_beacon_block.rs index da191dbff..548807068 100644 --- a/consensus/types/src/signed_beacon_block.rs +++ b/consensus/types/src/signed_beacon_block.rs @@ -53,7 +53,10 @@ impl From for Hash256 { derivative(PartialEq, Hash(bound = "E: EthSpec")), cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary)), serde(bound = "E: EthSpec, Payload: ExecPayload"), - ) + ), + map_into(BeaconBlock), + map_ref_into(BeaconBlockRef), + map_ref_mut_into(BeaconBlockRefMut) )] #[derive(Debug, Clone, Serialize, Deserialize, Encode, TreeHash, Derivative)] #[derivative(PartialEq, Hash(bound = "E: EthSpec"))] @@ -72,6 +75,8 @@ pub struct SignedBeaconBlock = FullPayload = SignedBeaconBlock>; + impl> SignedBeaconBlock { /// Returns the name of the fork pertaining to `self`. /// @@ -132,31 +137,27 @@ impl> SignedBeaconBlock { /// This is necessary to get a `&BeaconBlock` from a `SignedBeaconBlock` because /// `SignedBeaconBlock` only contains a `BeaconBlock` _variant_. pub fn deconstruct(self) -> (BeaconBlock, Signature) { - match self { - SignedBeaconBlock::Base(block) => (BeaconBlock::Base(block.message), block.signature), - SignedBeaconBlock::Altair(block) => { - (BeaconBlock::Altair(block.message), block.signature) - } - SignedBeaconBlock::Merge(block) => (BeaconBlock::Merge(block.message), block.signature), - } + map_signed_beacon_block_into_beacon_block!(self, |block, beacon_block_cons| { + (beacon_block_cons(block.message), block.signature) + }) } /// Accessor for the block's `message` field as a ref. - pub fn message(&self) -> BeaconBlockRef<'_, E, Payload> { - match self { - SignedBeaconBlock::Base(inner) => BeaconBlockRef::Base(&inner.message), - SignedBeaconBlock::Altair(inner) => BeaconBlockRef::Altair(&inner.message), - SignedBeaconBlock::Merge(inner) => BeaconBlockRef::Merge(&inner.message), - } + pub fn message<'a>(&'a self) -> BeaconBlockRef<'a, E, Payload> { + map_signed_beacon_block_ref_into_beacon_block_ref!( + &'a _, + self.to_ref(), + |inner, cons| cons(&inner.message) + ) } /// Accessor for the block's `message` as a mutable reference (for testing only). - pub fn message_mut(&mut self) -> BeaconBlockRefMut<'_, E, Payload> { - match self { - SignedBeaconBlock::Base(inner) => BeaconBlockRefMut::Base(&mut inner.message), - SignedBeaconBlock::Altair(inner) => BeaconBlockRefMut::Altair(&mut inner.message), - SignedBeaconBlock::Merge(inner) => BeaconBlockRefMut::Merge(&mut inner.message), - } + pub fn message_mut<'a>(&'a mut self) -> BeaconBlockRefMut<'a, E, Payload> { + map_signed_beacon_block_ref_mut_into_beacon_block_ref_mut!( + &'a _, + self.to_mut(), + |inner, cons| cons(&mut inner.message) + ) } /// Verify `self.signature`. @@ -225,3 +226,165 @@ impl> SignedBeaconBlock { self.message().tree_hash_root() } } + +// We can convert pre-Bellatrix blocks without payloads into blocks with payloads. +impl From>> + for SignedBeaconBlockBase> +{ + fn from(signed_block: SignedBeaconBlockBase>) -> Self { + let SignedBeaconBlockBase { message, signature } = signed_block; + SignedBeaconBlockBase { + message: message.into(), + signature, + } + } +} + +impl From>> + for SignedBeaconBlockAltair> +{ + fn from(signed_block: SignedBeaconBlockAltair>) -> Self { + let SignedBeaconBlockAltair { message, signature } = signed_block; + SignedBeaconBlockAltair { + message: message.into(), + signature, + } + } +} + +// Post-Bellatrix blocks can be "unblinded" by adding the full payload. +// NOTE: It might be nice to come up with a `superstruct` pattern to abstract over this before +// the first fork after Bellatrix. +impl SignedBeaconBlockMerge> { + pub fn into_full_block( + self, + execution_payload: ExecutionPayload, + ) -> SignedBeaconBlockMerge> { + let SignedBeaconBlockMerge { + message: + BeaconBlockMerge { + slot, + proposer_index, + parent_root, + state_root, + body: + BeaconBlockBodyMerge { + randao_reveal, + eth1_data, + graffiti, + proposer_slashings, + attester_slashings, + attestations, + deposits, + voluntary_exits, + sync_aggregate, + execution_payload: BlindedPayload { .. }, + }, + }, + signature, + } = self; + SignedBeaconBlockMerge { + message: BeaconBlockMerge { + slot, + proposer_index, + parent_root, + state_root, + body: BeaconBlockBodyMerge { + randao_reveal, + eth1_data, + graffiti, + proposer_slashings, + attester_slashings, + attestations, + deposits, + voluntary_exits, + sync_aggregate, + execution_payload: FullPayload { execution_payload }, + }, + }, + signature, + } + } +} + +impl SignedBeaconBlock> { + pub fn try_into_full_block( + self, + execution_payload: Option>, + ) -> Option>> { + let full_block = match self { + SignedBeaconBlock::Base(block) => SignedBeaconBlock::Base(block.into()), + SignedBeaconBlock::Altair(block) => SignedBeaconBlock::Altair(block.into()), + SignedBeaconBlock::Merge(block) => { + SignedBeaconBlock::Merge(block.into_full_block(execution_payload?)) + } + }; + Some(full_block) + } +} + +// We can blind blocks with payloads by converting the payload into a header. +// +// We can optionally keep the header, or discard it. +impl From> + for (SignedBlindedBeaconBlock, Option>) +{ + fn from(signed_block: SignedBeaconBlock) -> Self { + let (block, signature) = signed_block.deconstruct(); + let (blinded_block, payload) = block.into(); + ( + SignedBeaconBlock::from_block(blinded_block, signature), + payload, + ) + } +} + +impl From> for SignedBlindedBeaconBlock { + fn from(signed_block: SignedBeaconBlock) -> Self { + let (blinded_block, _) = signed_block.into(); + blinded_block + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn add_remove_payload_roundtrip() { + type E = MainnetEthSpec; + + let spec = &E::default_spec(); + let sig = Signature::empty(); + let blocks = vec![ + SignedBeaconBlock::::from_block( + BeaconBlock::Base(BeaconBlockBase::empty(spec)), + sig.clone(), + ), + SignedBeaconBlock::from_block( + BeaconBlock::Altair(BeaconBlockAltair::empty(spec)), + sig.clone(), + ), + SignedBeaconBlock::from_block(BeaconBlock::Merge(BeaconBlockMerge::empty(spec)), sig), + ]; + + for block in blocks { + let (blinded_block, payload): (SignedBlindedBeaconBlock, _) = block.clone().into(); + assert_eq!(blinded_block.tree_hash_root(), block.tree_hash_root()); + + if let Some(payload) = &payload { + assert_eq!( + payload.tree_hash_root(), + block + .message() + .execution_payload() + .unwrap() + .tree_hash_root() + ); + } + + let reconstructed = blinded_block.try_into_full_block(payload).unwrap(); + assert_eq!(reconstructed, block); + } + } +} diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index 9744434f5..92c28aeb0 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -338,7 +338,11 @@ impl Tester { // function. if !valid { // A missing parent block whilst `valid == false` means the test should pass. - if let Some(parent_block) = self.harness.chain.get_block(&block.parent_root()).unwrap() + if let Some(parent_block) = self + .harness + .chain + .get_blinded_block(&block.parent_root()) + .unwrap() { let parent_state_root = parent_block.state_root(); let mut state = self diff --git a/testing/execution_engine_integration/src/test_rig.rs b/testing/execution_engine_integration/src/test_rig.rs index 95751d1a8..79661354d 100644 --- a/testing/execution_engine_integration/src/test_rig.rs +++ b/testing/execution_engine_integration/src/test_rig.rs @@ -5,8 +5,8 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use task_executor::TaskExecutor; use tokio::time::sleep; use types::{ - Address, ChainSpec, EthSpec, ExecutionBlockHash, FullPayload, Hash256, MainnetEthSpec, Slot, - Uint256, + Address, ChainSpec, EthSpec, ExecutionBlockHash, ExecutionPayload, FullPayload, Hash256, + MainnetEthSpec, Slot, Uint256, }; const EXECUTION_ENGINE_START_TIMEOUT: Duration = Duration::from_secs(10); @@ -214,6 +214,7 @@ impl TestRig { .await .unwrap(); assert_eq!(status, PayloadStatus::Valid); + check_payload_reconstruction(&self.ee_a, &valid_payload).await; /* * Execution Engine A: @@ -288,6 +289,7 @@ impl TestRig { .await .unwrap(); assert_eq!(status, PayloadStatus::Valid); + check_payload_reconstruction(&self.ee_a, &second_payload).await; /* * Execution Engine A: @@ -359,6 +361,7 @@ impl TestRig { .await .unwrap(); assert_eq!(status, PayloadStatus::Valid); + check_payload_reconstruction(&self.ee_b, &valid_payload).await; /* * Execution Engine B: @@ -372,6 +375,7 @@ impl TestRig { .await .unwrap(); assert_eq!(status, PayloadStatus::Valid); + check_payload_reconstruction(&self.ee_b, &second_payload).await; /* * Execution Engine B: @@ -392,6 +396,22 @@ impl TestRig { } } +/// Check that the given payload can be re-constructed by fetching it from the EE. +/// +/// Panic if payload reconstruction fails. +async fn check_payload_reconstruction( + ee: &ExecutionPair, + payload: &ExecutionPayload, +) { + let reconstructed = ee + .execution_layer + .get_payload_by_block_hash(payload.block_hash) + .await + .unwrap() + .unwrap(); + assert_eq!(reconstructed, *payload); +} + /// Returns the duration since the unix epoch. pub fn timestamp_now() -> u64 { SystemTime::now()