code reorg: break splitstore.go into smaller logical units
This commit is contained in:
parent
44d01712c8
commit
5a23f64b3b
File diff suppressed because it is too large
Load Diff
1094
blockstore/splitstore/splitstore_compact.go
Normal file
1094
blockstore/splitstore/splitstore_compact.go
Normal file
File diff suppressed because it is too large
Load Diff
67
blockstore/splitstore/splitstore_util.go
Normal file
67
blockstore/splitstore/splitstore_util.go
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
package splitstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
cid "github.com/ipfs/go-cid"
|
||||||
|
mh "github.com/multiformats/go-multihash"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
)
|
||||||
|
|
||||||
|
func epochToBytes(epoch abi.ChainEpoch) []byte {
|
||||||
|
return uint64ToBytes(uint64(epoch))
|
||||||
|
}
|
||||||
|
|
||||||
|
func bytesToEpoch(buf []byte) abi.ChainEpoch {
|
||||||
|
return abi.ChainEpoch(bytesToUint64(buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
func int64ToBytes(i int64) []byte {
|
||||||
|
return uint64ToBytes(uint64(i))
|
||||||
|
}
|
||||||
|
|
||||||
|
func bytesToInt64(buf []byte) int64 {
|
||||||
|
return int64(bytesToUint64(buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
func uint64ToBytes(i uint64) []byte {
|
||||||
|
buf := make([]byte, 16)
|
||||||
|
n := binary.PutUvarint(buf, i)
|
||||||
|
return buf[:n]
|
||||||
|
}
|
||||||
|
|
||||||
|
func bytesToUint64(buf []byte) uint64 {
|
||||||
|
i, _ := binary.Uvarint(buf)
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
|
||||||
|
func isUnitaryObject(c cid.Cid) bool {
|
||||||
|
pre := c.Prefix()
|
||||||
|
switch pre.Codec {
|
||||||
|
case cid.FilCommitmentSealed, cid.FilCommitmentUnsealed:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return pre.MhType == mh.IDENTITY
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func isIdentiyCid(c cid.Cid) bool {
|
||||||
|
return c.Prefix().MhType == mh.IDENTITY
|
||||||
|
}
|
||||||
|
|
||||||
|
func decodeIdentityCid(c cid.Cid) ([]byte, error) {
|
||||||
|
dmh, err := mh.Decode(c.Hash())
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("error decoding identity cid %s: %w", c, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// sanity check
|
||||||
|
if dmh.Code != mh.IDENTITY {
|
||||||
|
return nil, xerrors.Errorf("error decoding identity cid %s: hash type is not identity", c)
|
||||||
|
}
|
||||||
|
|
||||||
|
return dmh.Digest, nil
|
||||||
|
}
|
126
blockstore/splitstore/splitstore_warmup.go
Normal file
126
blockstore/splitstore/splitstore_warmup.go
Normal file
@ -0,0 +1,126 @@
|
|||||||
|
package splitstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
blocks "github.com/ipfs/go-block-format"
|
||||||
|
cid "github.com/ipfs/go-cid"
|
||||||
|
|
||||||
|
bstore "github.com/filecoin-project/lotus/blockstore"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// warmup acuiqres the compaction lock and spawns a goroutine to warm up the hotstore;
|
||||||
|
// this is necessary when we sync from a snapshot or when we enable the splitstore
|
||||||
|
// on top of an existing blockstore (which becomes the coldstore).
|
||||||
|
func (s *SplitStore) warmup(curTs *types.TipSet) error {
|
||||||
|
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
|
||||||
|
return xerrors.Errorf("error locking compaction")
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer atomic.StoreInt32(&s.compacting, 0)
|
||||||
|
|
||||||
|
log.Info("warming up hotstore")
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
err := s.doWarmup(curTs)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error warming up hotstore: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infow("warm up done", "took", time.Since(start))
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// the actual warmup procedure; it walks the chain loading all state roots at the boundary
|
||||||
|
// and headers all the way up to genesis.
|
||||||
|
// objects are written in batches so as to minimize overhead.
|
||||||
|
func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
|
||||||
|
epoch := curTs.Height()
|
||||||
|
batchHot := make([]blocks.Block, 0, batchSize)
|
||||||
|
count := int64(0)
|
||||||
|
xcount := int64(0)
|
||||||
|
missing := int64(0)
|
||||||
|
err := s.walkChain(curTs, epoch, false,
|
||||||
|
func(c cid.Cid) error {
|
||||||
|
if isUnitaryObject(c) {
|
||||||
|
return errStopWalk
|
||||||
|
}
|
||||||
|
|
||||||
|
count++
|
||||||
|
|
||||||
|
has, err := s.hot.Has(c)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if has {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
blk, err := s.cold.Get(c)
|
||||||
|
if err != nil {
|
||||||
|
if err == bstore.ErrNotFound {
|
||||||
|
missing++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
xcount++
|
||||||
|
|
||||||
|
batchHot = append(batchHot, blk)
|
||||||
|
if len(batchHot) == batchSize {
|
||||||
|
err = s.hot.PutMany(batchHot)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
batchHot = batchHot[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(batchHot) > 0 {
|
||||||
|
err = s.hot.PutMany(batchHot)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infow("warmup stats", "visited", count, "warm", xcount, "missing", missing)
|
||||||
|
|
||||||
|
s.markSetSize = count + count>>2 // overestimate a bit
|
||||||
|
err = s.ds.Put(markSetSizeKey, int64ToBytes(s.markSetSize))
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("error saving mark set size: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// save the warmup epoch
|
||||||
|
err = s.ds.Put(warmupEpochKey, epochToBytes(epoch))
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error saving warm up epoch: %w", err)
|
||||||
|
}
|
||||||
|
s.mx.Lock()
|
||||||
|
s.warmupEpoch = epoch
|
||||||
|
s.mx.Unlock()
|
||||||
|
|
||||||
|
// also save the compactionIndex, as this is used as an indicator of warmup for upgraded nodes
|
||||||
|
err = s.ds.Put(compactionIndexKey, int64ToBytes(s.compactionIndex))
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error saving compaction index: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user