feat(store/v2): add WorkingHash (#20706)
This commit is contained in:
parent
5f2bcfc416
commit
be0e2eef0c
@ -26,6 +26,14 @@ type Store interface {
|
||||
// state. Must error when the version does not exist.
|
||||
StateAt(version uint64) (store.ReaderMap, error)
|
||||
|
||||
// SetInitialVersion sets the initial version of the store.
|
||||
SetInitialVersion(uint64) error
|
||||
|
||||
// WorkingHash writes the provided changeset to the state and returns
|
||||
// the working hash of the state.
|
||||
WorkingHash(changeset *store.Changeset) (store.Hash, error)
|
||||
|
||||
// Commit commits the provided changeset and returns the new state root of the state.
|
||||
Commit(changeset *store.Changeset) (store.Hash, error)
|
||||
|
||||
// Query is a key/value query directly to the underlying database. This skips the appmanager
|
||||
|
||||
@ -23,9 +23,6 @@ jq '.app_state.gov.voting_params.voting_period = "600s"' genesis.json > temp.jso
|
||||
# to change the inflation
|
||||
jq '.app_state.mint.minter.inflation = "0.300000000000000000"' genesis.json > temp.json && mv temp.json genesis.json
|
||||
|
||||
# change the initial height to 2 to work around store/v2 and iavl limitations with a genesis block
|
||||
jq '.initial_height = 2' genesis.json > temp.json && mv temp.json genesis.json
|
||||
|
||||
$SIMD config set client chain-id simapp-v2-chain
|
||||
$SIMD keys add test_validator --indiscreet
|
||||
VALIDATOR_ADDRESS=$($SIMD keys show test_validator -a --keyring-backend test)
|
||||
|
||||
@ -237,6 +237,8 @@ func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRe
|
||||
|
||||
// store chainID to be used later on in execution
|
||||
c.chainID = req.ChainId
|
||||
// TODO: check if we need to load the config from genesis.json or config.toml
|
||||
c.cfg.InitialHeight = uint64(req.InitialHeight)
|
||||
|
||||
// On a new chain, we consider the init chain block height as 0, even though
|
||||
// req.InitialHeight is 1 by default.
|
||||
@ -281,6 +283,11 @@ func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRe
|
||||
|
||||
validatorUpdates := intoABCIValidatorUpdates(blockresponse.ValidatorUpdates)
|
||||
|
||||
// set the initial version of the store
|
||||
if err := c.store.SetInitialVersion(uint64(req.InitialHeight)); err != nil {
|
||||
return nil, fmt.Errorf("failed to set initial version: %w", err)
|
||||
}
|
||||
|
||||
stateChanges, err := genesisState.GetStateChanges()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -288,9 +295,9 @@ func (c *Consensus[T]) InitChain(ctx context.Context, req *abciproto.InitChainRe
|
||||
cs := &store.Changeset{
|
||||
Changes: stateChanges,
|
||||
}
|
||||
stateRoot, err := c.store.Commit(cs)
|
||||
stateRoot, err := c.store.WorkingHash(cs)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to commit the changeset: %w", err)
|
||||
return nil, fmt.Errorf("unable to write the changeset: %w", err)
|
||||
}
|
||||
|
||||
return &abci.InitChainResponse{
|
||||
@ -414,6 +421,21 @@ func (c *Consensus[T]) FinalizeBlock(
|
||||
// LastCommit: sdktypes.ToSDKCommitInfo(req.DecidedLastCommit),
|
||||
// })
|
||||
|
||||
// we don't need to deliver the block in the genesis block
|
||||
if req.Height == int64(c.cfg.InitialHeight) {
|
||||
appHash, err := c.store.Commit(store.NewChangeset())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to commit the changeset: %w", err)
|
||||
}
|
||||
c.lastCommittedBlock.Store(&BlockData{
|
||||
Height: req.Height,
|
||||
Hash: appHash,
|
||||
})
|
||||
return &abciproto.FinalizeBlockResponse{
|
||||
AppHash: appHash,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// TODO(tip): can we expect some txs to not decode? if so, what we do in this case? this does not seem to be the case,
|
||||
// considering that prepare and process always decode txs, assuming they're the ones providing txs we should never
|
||||
// have a tx that fails decoding.
|
||||
|
||||
@ -14,6 +14,13 @@ type Store interface {
|
||||
// associated with it.
|
||||
StateLatest() (uint64, store.ReaderMap, error)
|
||||
|
||||
// SetInitialVersion sets the initial version of the store.
|
||||
SetInitialVersion(uint64) error
|
||||
|
||||
// WorkingHash writes the provided changeset to the state and returns
|
||||
// the working hash of the state.
|
||||
WorkingHash(*store.Changeset) (store.Hash, error)
|
||||
|
||||
// Commit commits the provided changeset and returns
|
||||
// the new state root of the state.
|
||||
Commit(*store.Changeset) (store.Hash, error)
|
||||
|
||||
@ -113,19 +113,6 @@ func (c *CommitStore) GetLatestVersion() (uint64, error) {
|
||||
return version, nil
|
||||
}
|
||||
|
||||
// IsEmpty returns true if the CommitStore is empty.
|
||||
func (c *CommitStore) IsEmpty() (bool, error) {
|
||||
value, err := c.db.Get([]byte(latestVersionKey))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if value == nil {
|
||||
return true, nil
|
||||
} else {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CommitStore) LoadVersion(targetVersion uint64) error {
|
||||
// Rollback the metadata to the target version.
|
||||
latestVersion, err := c.GetLatestVersion()
|
||||
@ -219,18 +206,18 @@ func (c *CommitStore) Commit(version uint64) (*proof.CommitInfo, error) {
|
||||
// If a commit event execution is interrupted, a new iavl store's version
|
||||
// will be larger than the RMS's metadata, when the block is replayed, we
|
||||
// should avoid committing that iavl store again.
|
||||
var (
|
||||
commitID proof.CommitID
|
||||
latestVersion = tree.GetLatestVersion()
|
||||
)
|
||||
if latestVersion != 0 && latestVersion >= version {
|
||||
var commitID proof.CommitID
|
||||
if tree.GetLatestVersion() >= version {
|
||||
commitID.Version = version
|
||||
commitID.Hash = tree.Hash()
|
||||
} else {
|
||||
hash, version, err := tree.Commit()
|
||||
hash, cversion, err := tree.Commit()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cversion != version {
|
||||
return nil, fmt.Errorf("commit version %d does not match the target version %d", cversion, version)
|
||||
}
|
||||
commitID = proof.CommitID{
|
||||
Version: version,
|
||||
Hash: hash,
|
||||
|
||||
@ -57,9 +57,6 @@ type Committer interface {
|
||||
// GetCommitInfo returns the CommitInfo for the given version.
|
||||
GetCommitInfo(version uint64) (*proof.CommitInfo, error)
|
||||
|
||||
// IsEmpty returns true if the database is empty.
|
||||
IsEmpty() (bool, error)
|
||||
|
||||
// Close releases associated resources. It should NOT be idempotent. It must
|
||||
// only be called once and any call after may panic.
|
||||
io.Closer
|
||||
|
||||
@ -35,7 +35,8 @@ type Store struct {
|
||||
// stateCommitment reflects the state commitment (SC) backend
|
||||
stateCommitment store.Committer
|
||||
|
||||
// commitHeader reflects the header used when committing state (note, this isn't required and only used for query purposes)
|
||||
// commitHeader reflects the header used when committing state
|
||||
// note, this isn't required and only used for query purposes)
|
||||
commitHeader *coreheader.Info
|
||||
|
||||
// lastCommitInfo reflects the last version/hash that has been committed
|
||||
@ -261,6 +262,40 @@ func (s *Store) SetCommitHeader(h *coreheader.Info) {
|
||||
s.commitHeader = h
|
||||
}
|
||||
|
||||
// WorkingHash writes the changeset to SC and SS and returns the workingHash
|
||||
// of the CommitInfo.
|
||||
func (s *Store) WorkingHash(cs *corestore.Changeset) ([]byte, error) {
|
||||
if s.telemetry != nil {
|
||||
now := time.Now()
|
||||
defer s.telemetry.MeasureSince(now, "root_store", "working_hash")
|
||||
}
|
||||
|
||||
// write the changeset to the SC and SS backends
|
||||
eg := new(errgroup.Group)
|
||||
eg.Go(func() error {
|
||||
if err := s.writeSC(cs); err != nil {
|
||||
return fmt.Errorf("failed to write SC: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
eg.Go(func() error {
|
||||
if err := s.stateStorage.ApplyChangeset(s.initialVersion, cs); err != nil {
|
||||
return fmt.Errorf("failed to commit SS: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err := eg.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
workingHash := s.lastCommitInfo.Hash()
|
||||
s.lastCommitInfo.Version -= 1 // reset lastCommitInfo to allow Commit() to work correctly
|
||||
|
||||
return workingHash, nil
|
||||
}
|
||||
|
||||
// Commit commits all state changes to the underlying SS and SC backends. It
|
||||
// writes a batch of the changeset to the SC tree, and retrieves the CommitInfo
|
||||
// from the SC tree. Finally, it commits the SC tree and returns the hash of the
|
||||
@ -392,13 +427,8 @@ func (s *Store) writeSC(cs *corestore.Changeset) error {
|
||||
return fmt.Errorf("failed to write batch to SC store: %w", err)
|
||||
}
|
||||
|
||||
isEmpty, err := s.stateCommitment.IsEmpty()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check if SC store is empty: %w", err)
|
||||
}
|
||||
|
||||
var previousHeight, version uint64
|
||||
if isEmpty {
|
||||
if s.lastCommitInfo.GetVersion() == 0 && s.initialVersion > 1 {
|
||||
// This case means that no commit has been made in the store, we
|
||||
// start from initialVersion.
|
||||
version = s.initialVersion
|
||||
|
||||
@ -112,7 +112,38 @@ func (s *RootStoreTestSuite) TestGetStateStorage() {
|
||||
}
|
||||
|
||||
func (s *RootStoreTestSuite) TestSetInitialVersion() {
|
||||
s.Require().NoError(s.rootStore.SetInitialVersion(100))
|
||||
initialVersion := uint64(5)
|
||||
s.Require().NoError(s.rootStore.SetInitialVersion(initialVersion))
|
||||
|
||||
// perform the initial commit
|
||||
cs := corestore.NewChangeset()
|
||||
cs.Add(testStoreKeyBytes, []byte("foo"), []byte("bar"), false)
|
||||
|
||||
wHash, err := s.rootStore.WorkingHash(cs)
|
||||
s.Require().NoError(err)
|
||||
cHash, err := s.rootStore.Commit(corestore.NewChangeset())
|
||||
s.Require().NoError(err)
|
||||
s.Require().Equal(wHash, cHash)
|
||||
|
||||
// check the latest version
|
||||
lVersion, err := s.rootStore.GetLatestVersion()
|
||||
s.Require().NoError(err)
|
||||
s.Require().Equal(initialVersion, lVersion)
|
||||
|
||||
// set the initial version again
|
||||
rInitialVersion := uint64(100)
|
||||
s.Require().NoError(s.rootStore.SetInitialVersion(rInitialVersion))
|
||||
|
||||
// perform the commit
|
||||
cs = corestore.NewChangeset()
|
||||
cs.Add(testStoreKey2Bytes, []byte("foo"), []byte("bar"), false)
|
||||
_, err = s.rootStore.Commit(cs)
|
||||
s.Require().NoError(err)
|
||||
lVersion, err = s.rootStore.GetLatestVersion()
|
||||
s.Require().NoError(err)
|
||||
// SetInitialVersion only works once
|
||||
s.Require().NotEqual(rInitialVersion, lVersion)
|
||||
s.Require().Equal(initialVersion+1, lVersion)
|
||||
}
|
||||
|
||||
func (s *RootStoreTestSuite) TestSetCommitHeader() {
|
||||
@ -348,6 +379,29 @@ func (s *RootStoreTestSuite) TestStateAt() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *RootStoreTestSuite) TestWorkingHash() {
|
||||
// write keys over multiple versions
|
||||
for v := uint64(1); v <= 5; v++ {
|
||||
// perform changes
|
||||
cs := corestore.NewChangeset()
|
||||
for _, storeKeyBytes := range [][]byte{testStoreKeyBytes, testStoreKey2Bytes, testStoreKey3Bytes} {
|
||||
for i := 0; i < 100; i++ {
|
||||
key := fmt.Sprintf("key_%x_%03d", i, storeKeyBytes) // key000, key001, ..., key099
|
||||
val := fmt.Sprintf("val%03d_%03d", i, v) // val000_1, val001_1, ..., val099_1
|
||||
|
||||
cs.Add(storeKeyBytes, []byte(key), []byte(val), false)
|
||||
}
|
||||
}
|
||||
|
||||
wHash, err := s.rootStore.WorkingHash(cs)
|
||||
s.Require().NoError(err)
|
||||
// execute Commit with empty changeset
|
||||
cHash, err := s.rootStore.Commit(corestore.NewChangeset())
|
||||
s.Require().NoError(err)
|
||||
s.Require().Equal(wHash, cHash)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *RootStoreTestSuite) TestPrune() {
|
||||
// perform changes
|
||||
cs := corestore.NewChangeset()
|
||||
|
||||
@ -49,11 +49,16 @@ type RootStore interface {
|
||||
// queries based on block time need to be supported.
|
||||
SetCommitHeader(h *coreheader.Info)
|
||||
|
||||
// WorkingHash returns the current WIP commitment hash by applying the Changeset
|
||||
// to the SC backend. It is only used to get the hash of the intermediate state
|
||||
// before committing, the typical use case is for the genesis block.
|
||||
// NOTE: It also writes the changeset to the SS backend.
|
||||
WorkingHash(cs *corestore.Changeset) ([]byte, error)
|
||||
|
||||
// Commit should be responsible for taking the provided changeset and flushing
|
||||
// it to disk. Note, depending on the implementation, the changeset, at this
|
||||
// point, may already be written to the SC backends. Commit() should ensure
|
||||
// the changeset is committed to all SC and SC backends and flushed to disk.
|
||||
// It must return a hash of the merkle-ized committed state.
|
||||
// it to disk. Note, it will overwrite the changeset if WorkingHash() was called.
|
||||
// Commit() should ensure the changeset is committed to all SC and SS backends
|
||||
// and flushed to disk. It must return a hash of the merkle-ized committed state.
|
||||
Commit(cs *corestore.Changeset) ([]byte, error)
|
||||
|
||||
// LastCommitID returns a CommitID pertaining to the last commitment.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user