Merge pull request #6756 from filecoin-project/feat/splitstore-reorg

Splitstore code reorg
This commit is contained in:
Steven Allen 2021-07-14 16:57:11 -07:00 committed by GitHub
commit 81d614a607
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 1296 additions and 1252 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,67 @@
package splitstore
import (
"encoding/binary"
"golang.org/x/xerrors"
cid "github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
"github.com/filecoin-project/go-state-types/abi"
)
func epochToBytes(epoch abi.ChainEpoch) []byte {
return uint64ToBytes(uint64(epoch))
}
func bytesToEpoch(buf []byte) abi.ChainEpoch {
return abi.ChainEpoch(bytesToUint64(buf))
}
func int64ToBytes(i int64) []byte {
return uint64ToBytes(uint64(i))
}
func bytesToInt64(buf []byte) int64 {
return int64(bytesToUint64(buf))
}
func uint64ToBytes(i uint64) []byte {
buf := make([]byte, 16)
n := binary.PutUvarint(buf, i)
return buf[:n]
}
func bytesToUint64(buf []byte) uint64 {
i, _ := binary.Uvarint(buf)
return i
}
func isUnitaryObject(c cid.Cid) bool {
pre := c.Prefix()
switch pre.Codec {
case cid.FilCommitmentSealed, cid.FilCommitmentUnsealed:
return true
default:
return pre.MhType == mh.IDENTITY
}
}
func isIdentiyCid(c cid.Cid) bool {
return c.Prefix().MhType == mh.IDENTITY
}
func decodeIdentityCid(c cid.Cid) ([]byte, error) {
dmh, err := mh.Decode(c.Hash())
if err != nil {
return nil, xerrors.Errorf("error decoding identity cid %s: %w", c, err)
}
// sanity check
if dmh.Code != mh.IDENTITY {
return nil, xerrors.Errorf("error decoding identity cid %s: hash type is not identity", c)
}
return dmh.Digest, nil
}

View File

@ -0,0 +1,126 @@
package splitstore
import (
"sync/atomic"
"time"
"golang.org/x/xerrors"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
bstore "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/types"
)
// warmup acuiqres the compaction lock and spawns a goroutine to warm up the hotstore;
// this is necessary when we sync from a snapshot or when we enable the splitstore
// on top of an existing blockstore (which becomes the coldstore).
func (s *SplitStore) warmup(curTs *types.TipSet) error {
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
return xerrors.Errorf("error locking compaction")
}
go func() {
defer atomic.StoreInt32(&s.compacting, 0)
log.Info("warming up hotstore")
start := time.Now()
err := s.doWarmup(curTs)
if err != nil {
log.Errorf("error warming up hotstore: %s", err)
return
}
log.Infow("warm up done", "took", time.Since(start))
}()
return nil
}
// the actual warmup procedure; it walks the chain loading all state roots at the boundary
// and headers all the way up to genesis.
// objects are written in batches so as to minimize overhead.
func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
epoch := curTs.Height()
batchHot := make([]blocks.Block, 0, batchSize)
count := int64(0)
xcount := int64(0)
missing := int64(0)
err := s.walkChain(curTs, epoch, false,
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}
count++
has, err := s.hot.Has(c)
if err != nil {
return err
}
if has {
return nil
}
blk, err := s.cold.Get(c)
if err != nil {
if err == bstore.ErrNotFound {
missing++
return nil
}
return err
}
xcount++
batchHot = append(batchHot, blk)
if len(batchHot) == batchSize {
err = s.hot.PutMany(batchHot)
if err != nil {
return err
}
batchHot = batchHot[:0]
}
return nil
})
if err != nil {
return err
}
if len(batchHot) > 0 {
err = s.hot.PutMany(batchHot)
if err != nil {
return err
}
}
log.Infow("warmup stats", "visited", count, "warm", xcount, "missing", missing)
s.markSetSize = count + count>>2 // overestimate a bit
err = s.ds.Put(markSetSizeKey, int64ToBytes(s.markSetSize))
if err != nil {
log.Warnf("error saving mark set size: %s", err)
}
// save the warmup epoch
err = s.ds.Put(warmupEpochKey, epochToBytes(epoch))
if err != nil {
return xerrors.Errorf("error saving warm up epoch: %w", err)
}
s.mx.Lock()
s.warmupEpoch = epoch
s.mx.Unlock()
// also save the compactionIndex, as this is used as an indicator of warmup for upgraded nodes
err = s.ds.Put(compactionIndexKey, int64ToBytes(s.compactionIndex))
if err != nil {
return xerrors.Errorf("error saving compaction index: %w", err)
}
return nil
}