From 3474efc599ac2c77c870c82d0c19386c23570637 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Fri, 1 Dec 2017 08:52:54 -0800 Subject: [PATCH] Cleanup multistore --- store/cachemultistore.go | 70 +++++++++ store/multistore.go | 298 ++++++++++++++++++++------------------- store/types.go | 6 +- 3 files changed, 228 insertions(+), 146 deletions(-) create mode 100644 store/cachemultistore.go diff --git a/store/cachemultistore.go b/store/cachemultistore.go new file mode 100644 index 0000000000..3c9ae7d443 --- /dev/null +++ b/store/cachemultistore.go @@ -0,0 +1,70 @@ +package store + +import dbm "github.com/tendermint/tmlibs/db" + +//---------------------------------------- +// cacheMultiStore + +type cwWriter interface { + Write() +} + +// cacheMultiStore holds many CacheWrap'd stores. +// Implements MultiStore. +type cacheMultiStore struct { + db dbm.DB + version int64 + lastCommitID CommitID + substores map[string]cwWriter +} + +func newCacheMultiStore(rs *rootMultiStore) cacheMultiStore { + cms := cacheMultiStore{ + db: db.CacheWrap(), + version: rs.version, + lastCommitID: rs.lastCommitID, + substores: make(map[string]cwwWriter), len(rs.substores), + } + for name, substore := range rs.substores { + cms.substores[name] = substore.CacheWrap().(cwWriter) + } + return cms +} + +// Implements CacheMultiStore +func (cms cacheMultiStore) LastCommitID() CommitID { + return cms.lastCommitID +} + +// Implements CacheMultiStore +func (cms cacheMultiStore) CurrentVersion() int64 { + return cms.version +} + +// Implements CacheMultiStore +func (cms cacheMultiStore) Write() { + cms.db.Write() + for substore := range rs.substores { + substore.(cwWriter).Write() + } +} + +// Implements CacheMultiStore +func (rs cacheMultiStore) CacheMultiStore() CacheMultiStore { + return newCacheMultiStore(rs) +} + +// Implements CacheMultiStore +func (rs cacheMultiStore) GetCommitter(name string) Committer { + return rs.store[name] +} + +// Implements CacheMultiStore +func (rs cacheMultiStore) GetKVStore(name string) KVStore { + return rs.store[name].(KVStore) +} + +// Implements CacheMultiStore +func (rs cacheMultiStore) GetIterKVStore(name string) IterKVStore { + return rs.store[name].(IterKVStore) +} diff --git a/store/multistore.go b/store/multistore.go index b8fa38bff3..f433ec719f 100644 --- a/store/multistore.go +++ b/store/multistore.go @@ -11,12 +11,20 @@ import ( ) const ( - msLatestKey = "s/latest" - msStateKeyFmt = "s/%d" // s/ + latestVersionKey = "s/latest" + commitStateKeyFmt = "s/%d" // s/ ) type MultiStore interface { + // Last commit, or the zero CommitID. + // If not zero, CommitID.Version is CurrentVersion()-1. + LastCommitID() CommitID + + // Current version being worked on now, not yet committed. + // Should be greater than 0. + CurrentVersion() int64 + // Cache wrap MultiStore. // NOTE: Caller should probably not call .Write() on each, but // call CacheMultiStore.Write(). @@ -36,10 +44,13 @@ type CacheMultiStore interface { //---------------------------------------- // rootMultiStore is composed of many Committers. +// Name contrasts with cacheMultiStore which is for cache-wrapping +// other MultiStores. // Implements MultiStore. type rootMultiStore struct { db dbm.DB - version int64 + curVersion int64 + lastHash []byte storeLoaders map[string]CommitterLoader substores map[string]Committer } @@ -47,7 +58,8 @@ type rootMultiStore struct { func NewMultiStore(db dbm.DB) *rootMultiStore { return &rootMultiStore{ db: db, - version: 0, + curVersion: 0, + lastHash: nil, storeLoaders: make(map[string]CommitterLoader), substores: make(map[string]Committer), } @@ -60,70 +72,15 @@ func (rs *rootMultiStore) SetCommitterLoader(name string, loader CommitterLoader rs.storeLoaders[name] = loader } -//---------------------------------------- -// rootMultiStore state - -type msState struct { - Substores []substore -} - -func (ms *msState) Sort() { - ms.Substores.Sort() -} - -func (ms *msState) Hash() []byte { - m := make(map[string]interface{}, len(ms.Substores)) - for _, substore := range ms.Substores { - m[substore.name] = substore.subState - } - return merkle.SimpleHashFromMap(m) -} - -//---------------------------------------- -// substore state - -type substore struct { - name string - subState -} - -// This gets serialized by go-wire -type subState struct { - CommitID CommitID - // ... maybe add more state -} - -func (ss subState) Hash() []byte { - ssBytes, _ := wire.Marshal(ss) // Does not error - hasher := ripemd160.New() - hasher.Write(ssBytes) - return hasher.Sum(nil) -} - -//---------------------------------------- - // Call once after all calls to SetCommitterLoader are complete. func (rs *rootMultiStore) LoadLatestVersion() error { - ver := rs.getLatestVersion() + ver := getLatestVersion(rs.db) rs.LoadVersion(ver) } -func (rs *rootMultiStore) getLatestVersion() int64 { - var latest int64 - latestBytes := rs.db.Get(msLatestKey) - if latestBytes == nil { - return 0 - } - err := wire.Unmarshal(latestBytes, &latest) - if err != nil { - panic(err) - } - return latest -} - // NOTE: Returns 0 unless LoadVersion() or LoadLatestVersion() is called. -func (rs *rootMultiStore) GetVersion() int64 { - return rs.version +func (rs *rootMultiStore) GetCurrentVersion() int64 { + return rs.curVersion } func (rs *rootMultiStore) LoadVersion(ver int64) error { @@ -131,26 +88,20 @@ func (rs *rootMultiStore) LoadVersion(ver int64) error { // Special logic for version 0 if ver == 0 { for name, storeLoader := range rs.storeLoaders { - store, err := storeLoader(CommitID{Version: 0}) + store, err := storeLoader(CommitID{}) if err != nil { return fmt.Errorf("Failed to load rootMultiStore: %v", err) } + rs.curVersion = 1 + rs.lastHash = nil rs.substores[name] = store } return nil } // Otherwise, version is 1 or greater - msStateKey := fmt.Sprintf(msStateKeyFmt, ver) - stateBytes := rs.db.Get(msStateKey, ver) - if bz == nil { - return fmt.Errorf("Failed to load rootMultiStore: no data") - } - var state msState - err := wire.Unmarshal(stateBytes, &state) - if err != nil { - return fmt.Errorf("Failed to load rootMultiStore: %v", err) - } + // Load commitState + var state commitState = loadCommitState(rs.db, ver) // Load each Substore var newSubstores = make(map[string]Committer) @@ -175,42 +126,86 @@ func (rs *rootMultiStore) LoadVersion(ver int64) error { } // Success. - rs.version = ver + rs.curVersion = ver + 1 + rs.lastHash = state.LastHash rs.substores = newSubstores return nil } -// Implements Committer -func (rs *rootMultiStore) Commit() CommitID { +// Commits each substore and gets commitState. +func (rs *RootMultiStore) doCommit() commitState { + version := rs.curVersion + lastHash := rs.LastHash + substores := make([]substore, len(rs.substores)) - // Needs to be transactional - batch := rs.db.NewBatch() - - // Save msState - var state msState for name, store := range rs.substores { + // Commit commitID := store.Commit() - state.Substores = append(state.Substores, - subState{ + + // Record CommitID + substores = append(substores, + substore{ Name: name, CommitID: commitID, }, ) } - state.Sort() + + // Incr curVersion + rs.curVersion += 1 + + return commitState{ + Version: version, + LastHash: lastHash, + Substores: substores, + } +} + +//---------------------------------------- + +// Implements Committer +func (rs *rootMultiStore) Commit() CommitID { + + version := rs.version + + // Needs to be transactional + batch := rs.db.NewBatch() + + // Commit each substore and get commitState + state := rs.doCommit() stateBytes, err := wire.Marshal(state) if err != nil { panic(err) } - msStateKey := fmt.Sprintf(msStateKeyFmt, rs.version) - batch.Set(msStateKey, stateBytes) + commitStateKey := fmt.Sprintf(commitStateKeyFmt, rs.version) + batch.Set(commitStateKey, stateBytes) - // Save msLatest + // Save the latest version latestBytes, _ := wire.Marshal(rs.version) // Does not error - batch.Set(msLatestKey, latestBytes) + batch.Set(latestVersionKey, latestBytes) batch.Write() - batch.version += 1 + rs.version += 1 + + return CommitID{ + Version: version, + Hash: state.Hash(), + } + +} + +// Get the last committed CommitID +func (rs *rootMultiStore) LastCommitID() CommitID { + + // If we haven't committed yet, return a zero CommitID + if rs.curVersion == 0 { + return CommitID{} + } + + return CommitID{ + Version: rs.curVersion - 1, + Hash: rs.LastHash, + } } // Implements MultiStore @@ -234,74 +229,87 @@ func (rs *rootMultiStore) GetIterKVStore(name string) IterKVStore { } //---------------------------------------- -// subStates +// commitState -type subStates []subState +// NOTE: Keep commitState a simple immutable struct. +type commitState struct { -func (ssz subStates) Len() int { return len(ssz) } -func (ssz subStates) Less(i, j int) bool { return ssz[i].Key < ssz[j].Key } -func (ssz subStates) Swap(i, j int) { ssz[i], ssz[j] = ssz[j], ssz[i] } -func (ssz subStates) Sort() { sort.Sort(ssz) } + // Version + Version int64 -func (ssz subStates) Hash() []byte { - hz := make([]merkle.Hashable, len(ssz)) - for i, ss := range ssz { - hz[i] = ss + // Last hash (memoization) + LastHash []byte + + // Substore info for + Substores []substore +} + +// loads commitState from disk. +func loadCommitState(db dbm.DB, ver int64) (commitState, error) { + + // Load from DB. + commitStateKey := fmt.Sprintf(commitStateKeyFmt, ver) + stateBytes := db.Get(commitStateKey, ver) + if bz == nil { + return commitState{}, fmt.Errorf("Failed to load rootMultiStore: no data") + } + + // Parse bytes. + var state commitState + err := wire.Unmarshal(stateBytes, &state) + if err != nil { + return commitState{}, fmt.Errorf("Failed to load rootMultiStore: %v", err) + } + return state, nil +} + +func (cs commitState) Hash() []byte { + // TODO cache to cs.hash []byte + m := make(map[string]interface{}, len(cs.Substores)) + for _, substore := range cs.Substores { + m[substore.Name] = substore + } + return merkle.SimpleHashFromMap(m) +} + +func (cs commitState) CommitID() CommitID { + return CommitID{ + Version: cs.Version, + Hash: cs.Hash(), } - return merkle.SimpleHashFromHashables(hz) } //---------------------------------------- -// cacheMultiStore +// substore state -type cwWriter interface { - Write() +type substore struct { + Name string + substoreCore } -// cacheMultiStore holds many CacheWrap'd stores. -// Implements MultiStore. -type cacheMultiStore struct { - db dbm.DB - version int64 - substores map[string]cwWriter +type substoreCore struct { + CommitID CommitID + // ... maybe add more state } -func newCacheMultiStore(rs *rootMultiStore) cacheMultiStore { - cms := cacheMultiStore{ - db: db.CacheWrap(), - version: rs.version, - substores: make(map[string]cwwWriter), len(rs.substores), +func (sc substoreCore) Hash() []byte { + scBytes, _ := wire.Marshal(sc) // Does not error + hasher := ripemd160.New() + hasher.Write(scBytes) + return hasher.Sum(nil) +} + +//---------------------------------------- + +func getLatestVersion(db dbm.DB) int64 { + var latest int64 + latestBytes := db.Get(latestVersionKey) + if latestBytes == nil { + return 0 } - for name, substore := range rs.substores { - cms.substores[name] = substore.CacheWrap().(cwWriter) + err := wire.Unmarshal(latestBytes, &latest) + if err != nil { + panic(err) } - return cms -} - -// Implements CacheMultiStore -func (cms cacheMultiStore) Write() { - cms.db.Write() - for substore := range rs.substores { - substore.(cwWriter).Write() - } -} - -// Implements CacheMultiStore -func (rs cacheMultiStore) CacheMultiStore() CacheMultiStore { - return newCacheMultiStore(rs) -} - -// Implements CacheMultiStore -func (rs cacheMultiStore) GetCommitter(name string) Committer { - return rs.store[name] -} - -// Implements CacheMultiStore -func (rs cacheMultiStore) GetKVStore(name string) KVStore { - return rs.store[name].(KVStore) -} - -// Implements CacheMultiStore -func (rs cacheMultiStore) GetIterKVStore(name string) IterKVStore { - return rs.store[name].(IterKVStore) + return latest } diff --git a/store/types.go b/store/types.go index d88207f7a3..44e2673d07 100644 --- a/store/types.go +++ b/store/types.go @@ -5,10 +5,14 @@ import ( ) type CommitID struct { - Version uint64 + Version int64 Hash []byte } +type (cid CommitID) IsZero() bool { + return cid.Version == 0 && len(cid.Hash) == 0 +} + type Committer interface { // Commit persists the state to disk.