feat:chain:splitstore chain prune (#9056)
* Splitstore chain prune * Protect on reification for simpler logic and sound cold compact protect * Recovery from checkpoint during chain prune * Splitstore (discard and universal mode) running in itests * Add pause and restart functions to itest block miner * Add config options to itest full nodes * Add FsRepo support for itest full ndoes Co-authored-by: zenground0 <ZenGround0@users.noreply.github.com>
This commit is contained in:
parent
881e16ec75
commit
0c91b0dc10
@ -1002,6 +1002,11 @@ workflows:
|
||||
suite: itest-self_sent_txn
|
||||
target: "./itests/self_sent_txn_test.go"
|
||||
|
||||
- test:
|
||||
name: test-itest-splitstore
|
||||
suite: itest-splitstore
|
||||
target: "./itests/splitstore_test.go"
|
||||
|
||||
- test:
|
||||
name: test-itest-tape
|
||||
suite: itest-tape
|
||||
|
@ -169,6 +169,10 @@ type FullNode interface {
|
||||
// If oldmsgskip is set, messages from before the requested roots are also not included.
|
||||
ChainExport(ctx context.Context, nroots abi.ChainEpoch, oldmsgskip bool, tsk types.TipSetKey) (<-chan []byte, error) //perm:read
|
||||
|
||||
// ChainPrune prunes the stored chain state and garbage collects; only supported if you
|
||||
// are using the splitstore
|
||||
ChainPrune(ctx context.Context, opts PruneOpts) error //perm:admin
|
||||
|
||||
// ChainCheckBlockstore performs an (asynchronous) health check on the chain/state blockstore
|
||||
// if supported by the underlying implementation.
|
||||
ChainCheckBlockstore(context.Context) error //perm:admin
|
||||
@ -1219,3 +1223,8 @@ type MsigTransaction struct {
|
||||
|
||||
Approved []address.Address
|
||||
}
|
||||
|
||||
type PruneOpts struct {
|
||||
MovingGC bool
|
||||
RetainState int64
|
||||
}
|
||||
|
@ -377,6 +377,20 @@ func (mr *MockFullNodeMockRecorder) ChainNotify(arg0 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainNotify", reflect.TypeOf((*MockFullNode)(nil).ChainNotify), arg0)
|
||||
}
|
||||
|
||||
// ChainPrune mocks base method.
|
||||
func (m *MockFullNode) ChainPrune(arg0 context.Context, arg1 api.PruneOpts) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "ChainPrune", arg0, arg1)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// ChainPrune indicates an expected call of ChainPrune.
|
||||
func (mr *MockFullNodeMockRecorder) ChainPrune(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainPrune", reflect.TypeOf((*MockFullNode)(nil).ChainPrune), arg0, arg1)
|
||||
}
|
||||
|
||||
// ChainPutObj mocks base method.
|
||||
func (m *MockFullNode) ChainPutObj(arg0 context.Context, arg1 blocks.Block) error {
|
||||
m.ctrl.T.Helper()
|
||||
|
@ -145,6 +145,8 @@ type FullNodeStruct struct {
|
||||
|
||||
ChainNotify func(p0 context.Context) (<-chan []*HeadChange, error) `perm:"read"`
|
||||
|
||||
ChainPrune func(p0 context.Context, p1 PruneOpts) error `perm:"admin"`
|
||||
|
||||
ChainPutObj func(p0 context.Context, p1 blocks.Block) error `perm:"admin"`
|
||||
|
||||
ChainReadObj func(p0 context.Context, p1 cid.Cid) ([]byte, error) `perm:"read"`
|
||||
@ -1354,6 +1356,17 @@ func (s *FullNodeStub) ChainNotify(p0 context.Context) (<-chan []*HeadChange, er
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *FullNodeStruct) ChainPrune(p0 context.Context, p1 PruneOpts) error {
|
||||
if s.Internal.ChainPrune == nil {
|
||||
return ErrNotSupported
|
||||
}
|
||||
return s.Internal.ChainPrune(p0, p1)
|
||||
}
|
||||
|
||||
func (s *FullNodeStub) ChainPrune(p0 context.Context, p1 PruneOpts) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *FullNodeStruct) ChainPutObj(p0 context.Context, p1 blocks.Block) error {
|
||||
if s.Internal.ChainPutObj == nil {
|
||||
return ErrNotSupported
|
||||
|
@ -102,6 +102,14 @@ func (b *idstore) Put(ctx context.Context, blk blocks.Block) error {
|
||||
return b.bs.Put(ctx, blk)
|
||||
}
|
||||
|
||||
func (b *idstore) ForEachKey(f func(cid.Cid) error) error {
|
||||
iterBstore, ok := b.bs.(BlockstoreIterator)
|
||||
if !ok {
|
||||
return xerrors.Errorf("underlying blockstore (type %T) doesn't support fast iteration", b.bs)
|
||||
}
|
||||
return iterBstore.ForEachKey(f)
|
||||
}
|
||||
|
||||
func (b *idstore) PutMany(ctx context.Context, blks []blocks.Block) error {
|
||||
toPut := make([]blocks.Block, 0, len(blks))
|
||||
for _, blk := range blks {
|
||||
|
@ -14,27 +14,28 @@ func NewMemory() MemBlockstore {
|
||||
}
|
||||
|
||||
// MemBlockstore is a terminal blockstore that keeps blocks in memory.
|
||||
type MemBlockstore map[cid.Cid]blocks.Block
|
||||
// To match behavior of badger blockstore we index by multihash only.
|
||||
type MemBlockstore map[string]blocks.Block
|
||||
|
||||
func (m MemBlockstore) DeleteBlock(ctx context.Context, k cid.Cid) error {
|
||||
delete(m, k)
|
||||
delete(m, string(k.Hash()))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m MemBlockstore) DeleteMany(ctx context.Context, ks []cid.Cid) error {
|
||||
for _, k := range ks {
|
||||
delete(m, k)
|
||||
delete(m, string(k.Hash()))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m MemBlockstore) Has(ctx context.Context, k cid.Cid) (bool, error) {
|
||||
_, ok := m[k]
|
||||
_, ok := m[string(k.Hash())]
|
||||
return ok, nil
|
||||
}
|
||||
|
||||
func (m MemBlockstore) View(ctx context.Context, k cid.Cid, callback func([]byte) error) error {
|
||||
b, ok := m[k]
|
||||
b, ok := m[string(k.Hash())]
|
||||
if !ok {
|
||||
return ipld.ErrNotFound{Cid: k}
|
||||
}
|
||||
@ -42,7 +43,7 @@ func (m MemBlockstore) View(ctx context.Context, k cid.Cid, callback func([]byte
|
||||
}
|
||||
|
||||
func (m MemBlockstore) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
|
||||
b, ok := m[k]
|
||||
b, ok := m[string(k.Hash())]
|
||||
if !ok {
|
||||
return nil, ipld.ErrNotFound{Cid: k}
|
||||
}
|
||||
@ -51,7 +52,7 @@ func (m MemBlockstore) Get(ctx context.Context, k cid.Cid) (blocks.Block, error)
|
||||
|
||||
// GetSize returns the CIDs mapped BlockSize
|
||||
func (m MemBlockstore) GetSize(ctx context.Context, k cid.Cid) (int, error) {
|
||||
b, ok := m[k]
|
||||
b, ok := m[string(k.Hash())]
|
||||
if !ok {
|
||||
return 0, ipld.ErrNotFound{Cid: k}
|
||||
}
|
||||
@ -62,7 +63,7 @@ func (m MemBlockstore) GetSize(ctx context.Context, k cid.Cid) (int, error) {
|
||||
func (m MemBlockstore) Put(ctx context.Context, b blocks.Block) error {
|
||||
// Convert to a basic block for safety, but try to reuse the existing
|
||||
// block if it's already a basic block.
|
||||
k := b.Cid()
|
||||
k := string(b.Cid().Hash())
|
||||
if _, ok := b.(*blocks.BasicBlock); !ok {
|
||||
// If we already have the block, abort.
|
||||
if _, ok := m[k]; ok {
|
||||
@ -71,7 +72,7 @@ func (m MemBlockstore) Put(ctx context.Context, b blocks.Block) error {
|
||||
// the error is only for debugging.
|
||||
b, _ = blocks.NewBlockWithCid(b.RawData(), b.Cid())
|
||||
}
|
||||
m[b.Cid()] = b
|
||||
m[k] = b
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -89,8 +90,8 @@ func (m MemBlockstore) PutMany(ctx context.Context, bs []blocks.Block) error {
|
||||
// the given context, closing the channel if it becomes Done.
|
||||
func (m MemBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
|
||||
ch := make(chan cid.Cid, len(m))
|
||||
for k := range m {
|
||||
ch <- k
|
||||
for _, b := range m {
|
||||
ch <- b.Cid()
|
||||
}
|
||||
close(ch)
|
||||
return ch, nil
|
||||
|
@ -43,8 +43,13 @@ var (
|
||||
// compactionIndexKey stores the compaction index (serial number)
|
||||
compactionIndexKey = dstore.NewKey("/splitstore/compactionIndex")
|
||||
|
||||
// stores the prune index (serial number)
|
||||
pruneIndexKey = dstore.NewKey("/splitstore/pruneIndex")
|
||||
|
||||
log = logging.Logger("splitstore")
|
||||
|
||||
errClosing = errors.New("splitstore is closing")
|
||||
|
||||
// set this to true if you are debugging the splitstore to enable debug logging
|
||||
enableDebugLog = false
|
||||
// set this to true if you want to track origin stack traces in the write log
|
||||
@ -54,6 +59,16 @@ var (
|
||||
upgradeBoundary = build.Finality
|
||||
)
|
||||
|
||||
type CompactType int
|
||||
|
||||
const (
|
||||
none CompactType = iota
|
||||
warmup
|
||||
hot
|
||||
cold
|
||||
check
|
||||
)
|
||||
|
||||
func init() {
|
||||
if os.Getenv("LOTUS_SPLITSTORE_DEBUG_LOG") == "1" {
|
||||
enableDebugLog = true
|
||||
@ -117,8 +132,9 @@ type hotstore interface {
|
||||
}
|
||||
|
||||
type SplitStore struct {
|
||||
compacting int32 // compaction/prune/warmup in progress
|
||||
closing int32 // the splitstore is closing
|
||||
compacting int32 // flag for when compaction is in progress
|
||||
compactType CompactType // compaction type, protected by compacting atomic, only meaningful when compacting == 1
|
||||
closing int32 // the splitstore is closing
|
||||
|
||||
cfg *Config
|
||||
path string
|
||||
@ -140,6 +156,7 @@ type SplitStore struct {
|
||||
markSetSize int64
|
||||
|
||||
compactionIndex int64
|
||||
pruneIndex int64
|
||||
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
@ -227,6 +244,13 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
|
||||
return nil, xerrors.Errorf("error resuming compaction: %w", err)
|
||||
}
|
||||
}
|
||||
if ss.pruneCheckpointExists() {
|
||||
log.Info("found prune checkpoint; resuming prune")
|
||||
if err := ss.completePrune(); err != nil {
|
||||
markSetEnv.Close() //nolint:errcheck
|
||||
return nil, xerrors.Errorf("error resuming prune: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return ss, nil
|
||||
}
|
||||
@ -260,8 +284,14 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
|
||||
if has {
|
||||
return s.has(cid)
|
||||
}
|
||||
|
||||
return s.cold.Has(ctx, cid)
|
||||
switch s.compactType {
|
||||
case hot:
|
||||
return s.cold.Has(ctx, cid)
|
||||
case cold:
|
||||
return s.hot.Has(ctx, cid)
|
||||
default:
|
||||
return false, xerrors.Errorf("invalid compaction type %d, only hot and cold allowed for critical section", s.compactType)
|
||||
}
|
||||
}
|
||||
|
||||
has, err := s.hot.Has(ctx, cid)
|
||||
@ -276,8 +306,11 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
|
||||
}
|
||||
|
||||
has, err = s.cold.Has(ctx, cid)
|
||||
if has && bstore.IsHotView(ctx) {
|
||||
s.reifyColdObject(cid)
|
||||
if has {
|
||||
s.trackTxnRef(cid)
|
||||
if bstore.IsHotView(ctx) {
|
||||
s.reifyColdObject(cid)
|
||||
}
|
||||
}
|
||||
|
||||
return has, err
|
||||
@ -307,8 +340,14 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)
|
||||
if has {
|
||||
return s.get(cid)
|
||||
}
|
||||
|
||||
return s.cold.Get(ctx, cid)
|
||||
switch s.compactType {
|
||||
case hot:
|
||||
return s.cold.Get(ctx, cid)
|
||||
case cold:
|
||||
return s.hot.Get(ctx, cid)
|
||||
default:
|
||||
return nil, xerrors.Errorf("invalid compaction type %d, only hot and cold allowed for critical section", s.compactType)
|
||||
}
|
||||
}
|
||||
|
||||
blk, err := s.hot.Get(ctx, cid)
|
||||
@ -325,6 +364,7 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)
|
||||
|
||||
blk, err = s.cold.Get(ctx, cid)
|
||||
if err == nil {
|
||||
s.trackTxnRef(cid)
|
||||
if bstore.IsHotView(ctx) {
|
||||
s.reifyColdObject(cid)
|
||||
}
|
||||
@ -361,8 +401,14 @@ func (s *SplitStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
|
||||
if has {
|
||||
return s.getSize(cid)
|
||||
}
|
||||
|
||||
return s.cold.GetSize(ctx, cid)
|
||||
switch s.compactType {
|
||||
case hot:
|
||||
return s.cold.GetSize(ctx, cid)
|
||||
case cold:
|
||||
return s.hot.GetSize(ctx, cid)
|
||||
default:
|
||||
return 0, xerrors.Errorf("invalid compaction type %d, only hot and cold allowed for critical section", s.compactType)
|
||||
}
|
||||
}
|
||||
|
||||
size, err := s.hot.GetSize(ctx, cid)
|
||||
@ -379,6 +425,7 @@ func (s *SplitStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
|
||||
|
||||
size, err = s.cold.GetSize(ctx, cid)
|
||||
if err == nil {
|
||||
s.trackTxnRef(cid)
|
||||
if bstore.IsHotView(ctx) {
|
||||
s.reifyColdObject(cid)
|
||||
}
|
||||
@ -408,12 +455,12 @@ func (s *SplitStore) Put(ctx context.Context, blk blocks.Block) error {
|
||||
s.debug.LogWrite(blk)
|
||||
|
||||
// critical section
|
||||
if s.txnMarkSet != nil {
|
||||
if s.txnMarkSet != nil && s.compactType == hot { // puts only touch hot store
|
||||
s.markLiveRefs([]cid.Cid{blk.Cid()})
|
||||
return nil
|
||||
}
|
||||
|
||||
s.trackTxnRef(blk.Cid())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -459,12 +506,12 @@ func (s *SplitStore) PutMany(ctx context.Context, blks []blocks.Block) error {
|
||||
s.debug.LogWriteMany(blks)
|
||||
|
||||
// critical section
|
||||
if s.txnMarkSet != nil {
|
||||
if s.txnMarkSet != nil && s.compactType == hot { // puts only touch hot store
|
||||
s.markLiveRefs(batch)
|
||||
return nil
|
||||
}
|
||||
|
||||
s.trackTxnRefMany(batch)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -536,8 +583,14 @@ func (s *SplitStore) View(ctx context.Context, cid cid.Cid, cb func([]byte) erro
|
||||
if has {
|
||||
return s.view(cid, cb)
|
||||
}
|
||||
|
||||
return s.cold.View(ctx, cid, cb)
|
||||
switch s.compactType {
|
||||
case hot:
|
||||
return s.cold.View(ctx, cid, cb)
|
||||
case cold:
|
||||
return s.hot.View(ctx, cid, cb)
|
||||
default:
|
||||
return xerrors.Errorf("invalid compaction type %d, only hot and cold allowed for critical section", s.compactType)
|
||||
}
|
||||
}
|
||||
|
||||
// views are (optimistically) protected two-fold:
|
||||
|
@ -25,6 +25,7 @@ func (s *SplitStore) Check() error {
|
||||
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
|
||||
return xerrors.Errorf("can't acquire compaction lock; compacting operation in progress")
|
||||
}
|
||||
s.compactType = check
|
||||
|
||||
if s.compactionIndex == 0 {
|
||||
atomic.StoreInt32(&s.compacting, 0)
|
||||
@ -146,6 +147,8 @@ func (s *SplitStore) Info() map[string]interface{} {
|
||||
info["base epoch"] = s.baseEpoch
|
||||
info["warmup epoch"] = s.warmupEpoch
|
||||
info["compactions"] = s.compactionIndex
|
||||
info["prunes"] = s.pruneIndex
|
||||
info["compacting"] = s.compacting == 1
|
||||
|
||||
sizer, ok := s.hot.(bstore.BlockstoreSize)
|
||||
if ok {
|
||||
|
@ -52,6 +52,12 @@ var (
|
||||
// SyncWaitTime is the time delay from a tipset's min timestamp before we decide
|
||||
// we have synced.
|
||||
SyncWaitTime = 30 * time.Second
|
||||
|
||||
// This is a testing flag that should always be true when running a node. itests rely on the rough hack
|
||||
// of starting genesis so far in the past that they exercise catchup mining to mine
|
||||
// blocks quickly and so disabling syncgap checking is necessary to test compaction
|
||||
// without a deep structural improvement of itests.
|
||||
CheckSyncGap = true
|
||||
)
|
||||
|
||||
var (
|
||||
@ -81,7 +87,7 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
|
||||
// this is guaranteed by the chainstore, and it is pervasive in all lotus
|
||||
// -- if that ever changes then all hell will break loose in general and
|
||||
// we will have a rance to protectTipSets here.
|
||||
// Reagrdless, we put a mutex in HeadChange just to be safe
|
||||
// Regardless, we put a mutex in HeadChange just to be safe
|
||||
|
||||
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
|
||||
// we are currently compacting -- protect the new tipset(s)
|
||||
@ -96,7 +102,8 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
|
||||
}
|
||||
|
||||
timestamp := time.Unix(int64(curTs.MinTimestamp()), 0)
|
||||
if time.Since(timestamp) > SyncGapTime {
|
||||
|
||||
if CheckSyncGap && time.Since(timestamp) > SyncGapTime {
|
||||
// don't attempt compaction before we have caught up syncing
|
||||
atomic.StoreInt32(&s.compacting, 0)
|
||||
return nil
|
||||
@ -111,6 +118,7 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
|
||||
if epoch-s.baseEpoch > CompactionThreshold {
|
||||
// it's time to compact -- prepare the transaction and go!
|
||||
s.beginTxnProtect()
|
||||
s.compactType = hot
|
||||
go func() {
|
||||
defer atomic.StoreInt32(&s.compacting, 0)
|
||||
defer s.endTxnProtect()
|
||||
@ -587,7 +595,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
|
||||
// some stats for logging
|
||||
var hotCnt, coldCnt int
|
||||
|
||||
err = s.hot.ForEachKey(func(c cid.Cid) error {
|
||||
// was it marked?
|
||||
mark, err := markSet.Has(c)
|
||||
@ -608,7 +615,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error collecting cold objects: %w", err)
|
||||
}
|
||||
@ -777,6 +783,10 @@ func (s *SplitStore) beginCriticalSection(markSet MarkSet) error {
|
||||
|
||||
func (s *SplitStore) waitForSync() {
|
||||
log.Info("waiting for sync")
|
||||
if !CheckSyncGap {
|
||||
log.Warnf("If you see this outside of test it is a serious splitstore issue")
|
||||
return
|
||||
}
|
||||
startWait := time.Now()
|
||||
defer func() {
|
||||
log.Infow("waiting for sync done", "took", time.Since(startWait))
|
||||
@ -1115,7 +1125,6 @@ func (s *SplitStore) moveColdBlocks(coldr *ColdSetReader) error {
|
||||
if err := s.checkClosing(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
blk, err := s.hot.Get(s.ctx, c)
|
||||
if err != nil {
|
||||
if ipld.IsNotFound(err) {
|
||||
@ -1133,6 +1142,7 @@ func (s *SplitStore) moveColdBlocks(coldr *ColdSetReader) error {
|
||||
return xerrors.Errorf("error putting batch to coldstore: %w", err)
|
||||
}
|
||||
batch = batch[:0]
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -1221,8 +1231,17 @@ func (s *SplitStore) purgeBatch(batch, deadCids []cid.Cid, checkpoint *Checkpoin
|
||||
return 0, liveCnt, nil
|
||||
}
|
||||
|
||||
if err := s.hot.DeleteMany(s.ctx, deadCids); err != nil {
|
||||
return 0, liveCnt, xerrors.Errorf("error purging cold objects: %w", err)
|
||||
switch s.compactType {
|
||||
case hot:
|
||||
if err := s.hot.DeleteMany(s.ctx, deadCids); err != nil {
|
||||
return 0, liveCnt, xerrors.Errorf("error purging cold objects: %w", err)
|
||||
}
|
||||
case cold:
|
||||
if err := s.cold.DeleteMany(s.ctx, deadCids); err != nil {
|
||||
return 0, liveCnt, xerrors.Errorf("error purging dead objects: %w", err)
|
||||
}
|
||||
default:
|
||||
return 0, liveCnt, xerrors.Errorf("invalid compaction type %d, only hot and cold allowed for critical section", s.compactType)
|
||||
}
|
||||
|
||||
s.debug.LogDelete(deadCids)
|
||||
@ -1239,15 +1258,28 @@ func (s *SplitStore) coldSetPath() string {
|
||||
return filepath.Join(s.path, "coldset")
|
||||
}
|
||||
|
||||
func (s *SplitStore) deadSetPath() string {
|
||||
return filepath.Join(s.path, "deadset")
|
||||
}
|
||||
|
||||
func (s *SplitStore) checkpointPath() string {
|
||||
return filepath.Join(s.path, "checkpoint")
|
||||
}
|
||||
|
||||
func (s *SplitStore) pruneCheckpointPath() string {
|
||||
return filepath.Join(s.path, "prune-checkpoint")
|
||||
}
|
||||
|
||||
func (s *SplitStore) checkpointExists() bool {
|
||||
_, err := os.Stat(s.checkpointPath())
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (s *SplitStore) pruneCheckpointExists() bool {
|
||||
_, err := os.Stat(s.pruneCheckpointPath())
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (s *SplitStore) completeCompaction() error {
|
||||
checkpoint, last, err := OpenCheckpoint(s.checkpointPath())
|
||||
if err != nil {
|
||||
@ -1268,6 +1300,7 @@ func (s *SplitStore) completeCompaction() error {
|
||||
defer markSet.Close() //nolint:errcheck
|
||||
|
||||
// PURGE
|
||||
s.compactType = hot
|
||||
log.Info("purging cold objects from the hotstore")
|
||||
startPurge := time.Now()
|
||||
err = s.completePurge(coldr, checkpoint, last, markSet)
|
||||
@ -1290,6 +1323,7 @@ func (s *SplitStore) completeCompaction() error {
|
||||
if err := os.Remove(s.coldSetPath()); err != nil {
|
||||
log.Warnf("error removing coldset: %s", err)
|
||||
}
|
||||
s.compactType = none
|
||||
|
||||
// Note: at this point we can start the splitstore; a compaction should run on
|
||||
// the first head change, which will trigger gc on the hotstore.
|
||||
|
@ -43,6 +43,7 @@ func (es *exposedSplitStore) Has(ctx context.Context, c cid.Cid) (bool, error) {
|
||||
}
|
||||
|
||||
func (es *exposedSplitStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
|
||||
|
||||
if isIdentiyCid(c) {
|
||||
data, err := decodeIdentityCid(c)
|
||||
if err != nil {
|
||||
|
@ -27,7 +27,7 @@ func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreG
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infow("garbage collecting hotstore done", "took", time.Since(startGC))
|
||||
log.Infow("garbage collecting blockstore done", "took", time.Since(startGC))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
564
blockstore/splitstore/splitstore_prune.go
Normal file
564
blockstore/splitstore/splitstore_prune.go
Normal file
@ -0,0 +1,564 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"os"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
"go.opencensus.io/stats"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
bstore "github.com/filecoin-project/lotus/blockstore"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
// PruneOnline is a prune option that instructs PruneChain to use online gc for reclaiming space;
|
||||
// there is no value associated with this option.
|
||||
PruneOnlineGC = "splitstore.PruneOnlineGC"
|
||||
|
||||
// PruneMoving is a prune option that instructs PruneChain to use moving gc for reclaiming space;
|
||||
// the value associated with this option is the path of the new coldstore.
|
||||
PruneMovingGC = "splitstore.PruneMovingGC"
|
||||
|
||||
// PruneRetainState is a prune option that instructs PruneChain as to how many finalities worth
|
||||
// of state to retain in the coldstore.
|
||||
// The value is an integer:
|
||||
// - if it is -1 then all state objects reachable from the chain will be retained in the coldstore.
|
||||
// this is useful for garbage collecting side-chains and other garbage in archival nodes.
|
||||
// This is the (safe) default.
|
||||
// - if it is 0 then no state objects that are unreachable within the compaction boundary will
|
||||
// be retained in the coldstore.
|
||||
// - if it is a positive integer, then it's the number of finalities past the compaction boundary
|
||||
// for which chain-reachable state objects are retained.
|
||||
PruneRetainState = "splitstore.PruneRetainState"
|
||||
)
|
||||
|
||||
// PruneChain instructs the SplitStore to prune chain state in the coldstore, according to the
|
||||
// options specified.
|
||||
func (s *SplitStore) PruneChain(opts api.PruneOpts) error {
|
||||
retainState := opts.RetainState
|
||||
|
||||
var gcOpts []bstore.BlockstoreGCOption
|
||||
if opts.MovingGC {
|
||||
gcOpts = append(gcOpts, bstore.WithFullGC(true))
|
||||
}
|
||||
doGC := func() error { return s.gcBlockstore(s.cold, gcOpts) }
|
||||
|
||||
var retainStateP func(int64) bool
|
||||
switch {
|
||||
case retainState > 0:
|
||||
retainStateP = func(depth int64) bool {
|
||||
return depth <= int64(CompactionBoundary)+retainState*int64(build.Finality)
|
||||
}
|
||||
case retainState < 0:
|
||||
retainStateP = func(_ int64) bool { return true }
|
||||
default:
|
||||
retainStateP = func(depth int64) bool {
|
||||
return depth <= int64(CompactionBoundary)
|
||||
}
|
||||
}
|
||||
|
||||
if _, ok := s.cold.(bstore.BlockstoreIterator); !ok {
|
||||
return xerrors.Errorf("coldstore does not support efficient iteration")
|
||||
}
|
||||
|
||||
return s.pruneChain(retainStateP, doGC)
|
||||
}
|
||||
|
||||
func (s *SplitStore) pruneChain(retainStateP func(int64) bool, doGC func() error) error {
|
||||
// inhibit compaction while we are setting up
|
||||
s.headChangeMx.Lock()
|
||||
defer s.headChangeMx.Unlock()
|
||||
|
||||
// take the compaction lock; fail if there is a compaction in progress
|
||||
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
|
||||
return xerrors.Errorf("compaction, prune or warmup in progress")
|
||||
}
|
||||
|
||||
// check if we are actually closing first
|
||||
if atomic.LoadInt32(&s.closing) == 1 {
|
||||
atomic.StoreInt32(&s.compacting, 0)
|
||||
return errClosing
|
||||
}
|
||||
|
||||
// ensure that we have compacted at least once
|
||||
if s.compactionIndex == 0 {
|
||||
atomic.StoreInt32(&s.compacting, 0)
|
||||
return xerrors.Errorf("splitstore has not compacted yet")
|
||||
}
|
||||
|
||||
// get the current tipset
|
||||
curTs := s.chain.GetHeaviestTipSet()
|
||||
|
||||
// begin the transaction and go
|
||||
s.beginTxnProtect()
|
||||
s.compactType = cold
|
||||
go func() {
|
||||
defer atomic.StoreInt32(&s.compacting, 0)
|
||||
defer s.endTxnProtect()
|
||||
|
||||
log.Info("pruning splitstore")
|
||||
start := time.Now()
|
||||
|
||||
s.prune(curTs, retainStateP, doGC)
|
||||
|
||||
log.Infow("prune done", "took", time.Since(start))
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SplitStore) prune(curTs *types.TipSet, retainStateP func(int64) bool, doGC func() error) {
|
||||
log.Debug("waiting for active views to complete")
|
||||
start := time.Now()
|
||||
s.viewWait()
|
||||
log.Debugw("waiting for active views done", "took", time.Since(start))
|
||||
|
||||
err := s.doPrune(curTs, retainStateP, doGC)
|
||||
if err != nil {
|
||||
log.Errorf("PRUNE ERROR: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool, doGC func() error) error {
|
||||
currentEpoch := curTs.Height()
|
||||
log.Infow("running prune", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch)
|
||||
|
||||
markSet, err := s.markSetEnv.New("live", s.markSetSize)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error creating mark set: %w", err)
|
||||
}
|
||||
defer markSet.Close() //nolint:errcheck
|
||||
defer s.debug.Flush()
|
||||
|
||||
if err := s.checkClosing(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 0. track all protected references at beginning of compaction; anything added later should
|
||||
// be transactionally protected by the write
|
||||
log.Info("protecting references with registered protectors")
|
||||
err = s.applyProtectors()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 1. mark reachable objects by walking the chain from the current epoch; we keep all messages
|
||||
// and chain headers; state and reciepts are retained only if it is within retention policy scope
|
||||
log.Info("marking reachable objects")
|
||||
startMark := time.Now()
|
||||
|
||||
count := new(int64)
|
||||
err = s.walkChainDeep(curTs, retainStateP,
|
||||
func(c cid.Cid) error {
|
||||
if isUnitaryObject(c) {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
mark, err := markSet.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking markset: %w", err)
|
||||
}
|
||||
|
||||
if mark {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
atomic.AddInt64(count, 1)
|
||||
return markSet.Mark(c)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error marking: %w", err)
|
||||
}
|
||||
|
||||
log.Infow("marking done", "took", time.Since(startMark), "marked", count)
|
||||
|
||||
if err := s.checkClosing(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 1.1 protect transactional refs
|
||||
err = s.protectTxnRefs(markSet)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error protecting transactional refs: %w", err)
|
||||
}
|
||||
|
||||
if err := s.checkClosing(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 2. iterate through the coldstore to collect dead objects
|
||||
log.Info("collecting dead objects")
|
||||
startCollect := time.Now()
|
||||
|
||||
deadw, err := NewColdSetWriter(s.deadSetPath())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error creating coldset: %w", err)
|
||||
}
|
||||
defer deadw.Close() //nolint:errcheck
|
||||
|
||||
// some stats for logging
|
||||
var liveCnt, deadCnt int
|
||||
|
||||
err = s.cold.(bstore.BlockstoreIterator).ForEachKey(func(c cid.Cid) error {
|
||||
// was it marked?
|
||||
mark, err := markSet.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking mark set for %s: %w", c, err)
|
||||
}
|
||||
|
||||
if mark {
|
||||
liveCnt++
|
||||
return nil
|
||||
}
|
||||
|
||||
// Note: a possible optimization here is to also purge objects that are in the hotstore
|
||||
// but this needs special care not to purge genesis state, so we don't bother (yet)
|
||||
|
||||
// it's dead in the coldstore, mark it as candidate for purge
|
||||
|
||||
if err := deadw.Write(c); err != nil {
|
||||
return xerrors.Errorf("error writing cid to coldstore: %w", err)
|
||||
}
|
||||
deadCnt++
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error dead objects: %w", err)
|
||||
}
|
||||
|
||||
if err := deadw.Close(); err != nil {
|
||||
return xerrors.Errorf("error closing deadset: %w", err)
|
||||
}
|
||||
|
||||
stats.Record(s.ctx, metrics.SplitstoreCompactionDead.M(int64(deadCnt)))
|
||||
|
||||
log.Infow("dead collection done", "took", time.Since(startCollect))
|
||||
log.Infow("prune stats", "live", liveCnt, "dead", deadCnt)
|
||||
|
||||
if err := s.checkClosing(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// now that we have collected dead objects, check for missing references from transactional i/o
|
||||
// this is carried over from hot compaction for completeness
|
||||
s.waitForMissingRefs(markSet)
|
||||
|
||||
if err := s.checkClosing(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
deadr, err := NewColdSetReader(s.deadSetPath())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error opening deadset: %w", err)
|
||||
}
|
||||
defer deadr.Close() //nolint:errcheck
|
||||
|
||||
// 3. Purge dead objects with checkpointing for recovery.
|
||||
// This is the critical section of prune, whereby any dead object not in the markSet is
|
||||
// considered already deleted.
|
||||
// We delete dead objects in batches, holding the transaction lock, where we check the markSet
|
||||
// again for new references created by the caller.
|
||||
// After each batch we write a checkpoint to disk; if the process is interrupted before completion
|
||||
// the process will continue from the checkpoint in the next recovery.
|
||||
if err := s.beginCriticalSection(markSet); err != nil {
|
||||
return xerrors.Errorf("error beginning critical section: %w", err)
|
||||
}
|
||||
|
||||
if err := s.checkClosing(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
checkpoint, err := NewCheckpoint(s.pruneCheckpointPath())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error creating checkpoint: %w", err)
|
||||
}
|
||||
defer checkpoint.Close() //nolint:errcheck
|
||||
|
||||
log.Info("purging dead objects from the coldstore")
|
||||
startPurge := time.Now()
|
||||
err = s.purge(deadr, checkpoint, markSet)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error purging dead objects: %w", err)
|
||||
}
|
||||
log.Infow("purging dead objects from coldstore done", "took", time.Since(startPurge))
|
||||
|
||||
s.endCriticalSection()
|
||||
|
||||
if err := checkpoint.Close(); err != nil {
|
||||
log.Warnf("error closing checkpoint: %s", err)
|
||||
}
|
||||
if err := os.Remove(s.pruneCheckpointPath()); err != nil {
|
||||
log.Warnf("error removing checkpoint: %s", err)
|
||||
}
|
||||
if err := deadr.Close(); err != nil {
|
||||
log.Warnf("error closing deadset: %s", err)
|
||||
}
|
||||
if err := os.Remove(s.deadSetPath()); err != nil {
|
||||
log.Warnf("error removing deadset: %s", err)
|
||||
}
|
||||
|
||||
// we are done; do some housekeeping
|
||||
s.endTxnProtect()
|
||||
err = doGC()
|
||||
if err != nil {
|
||||
log.Warnf("error garbage collecting cold store: %s", err)
|
||||
}
|
||||
|
||||
s.pruneIndex++
|
||||
err = s.ds.Put(s.ctx, pruneIndexKey, int64ToBytes(s.compactionIndex))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error saving compaction index: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SplitStore) completePrune() error {
|
||||
checkpoint, last, err := OpenCheckpoint(s.pruneCheckpointPath())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error opening checkpoint: %w", err)
|
||||
}
|
||||
defer checkpoint.Close() //nolint:errcheck
|
||||
|
||||
deadr, err := NewColdSetReader(s.deadSetPath())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error opening deadset: %w", err)
|
||||
}
|
||||
defer deadr.Close() //nolint:errcheck
|
||||
|
||||
markSet, err := s.markSetEnv.Recover("live")
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error recovering markset: %w", err)
|
||||
}
|
||||
defer markSet.Close() //nolint:errcheck
|
||||
|
||||
// PURGE!
|
||||
s.compactType = cold
|
||||
log.Info("purging dead objects from the coldstore")
|
||||
startPurge := time.Now()
|
||||
err = s.completePurge(deadr, checkpoint, last, markSet)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error purgin dead objects: %w", err)
|
||||
}
|
||||
log.Infow("purging dead objects from the coldstore done", "took", time.Since(startPurge))
|
||||
|
||||
markSet.EndCriticalSection()
|
||||
s.compactType = none
|
||||
|
||||
if err := checkpoint.Close(); err != nil {
|
||||
log.Warnf("error closing checkpoint: %s", err)
|
||||
}
|
||||
if err := os.Remove(s.pruneCheckpointPath()); err != nil {
|
||||
log.Warnf("error removing checkpoint: %s", err)
|
||||
}
|
||||
if err := deadr.Close(); err != nil {
|
||||
log.Warnf("error closing deadset: %s", err)
|
||||
}
|
||||
if err := os.Remove(s.deadSetPath()); err != nil {
|
||||
log.Warnf("error removing deadset: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// like walkChain but peforms a deep walk, using parallel walking with walkObjectLax,
|
||||
// whereby all extant messages are retained and state roots are retained if they satisfy
|
||||
// the given predicate.
|
||||
// missing references are ignored, as we expect to have plenty for snapshot syncs.
|
||||
func (s *SplitStore) walkChainDeep(ts *types.TipSet, retainStateP func(int64) bool,
|
||||
f func(cid.Cid) error) error {
|
||||
visited := cid.NewSet()
|
||||
toWalk := ts.Cids()
|
||||
walkCnt := 0
|
||||
|
||||
workers := runtime.NumCPU() / 2
|
||||
if workers < 2 {
|
||||
workers = 2
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
workch := make(chan cid.Cid, 16*workers)
|
||||
errch := make(chan error, workers)
|
||||
|
||||
var once sync.Once
|
||||
defer once.Do(func() { close(workch) })
|
||||
|
||||
push := func(c cid.Cid) error {
|
||||
if !visited.Visit(c) {
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case workch <- c:
|
||||
return nil
|
||||
case err := <-errch:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
worker := func() {
|
||||
defer wg.Done()
|
||||
for c := range workch {
|
||||
err := s.walkObjectLax(c, f)
|
||||
if err != nil {
|
||||
errch <- xerrors.Errorf("error walking object (cid: %s): %w", c, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
wg.Add(1)
|
||||
go worker()
|
||||
}
|
||||
|
||||
baseEpoch := ts.Height()
|
||||
minEpoch := baseEpoch // for progress report
|
||||
log.Infof("walking at epoch %d", minEpoch)
|
||||
|
||||
walkBlock := func(c cid.Cid) error {
|
||||
if !visited.Visit(c) {
|
||||
return nil
|
||||
}
|
||||
|
||||
walkCnt++
|
||||
|
||||
if err := f(c); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var hdr types.BlockHeader
|
||||
err := s.view(c, func(data []byte) error {
|
||||
return hdr.UnmarshalCBOR(bytes.NewBuffer(data))
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error unmarshaling block header (cid: %s): %w", c, err)
|
||||
}
|
||||
|
||||
if hdr.Height < minEpoch {
|
||||
minEpoch = hdr.Height
|
||||
if minEpoch%10_000 == 0 {
|
||||
log.Infof("walking at epoch %d (walked: %d)", minEpoch, walkCnt)
|
||||
}
|
||||
}
|
||||
|
||||
depth := int64(baseEpoch - hdr.Height)
|
||||
retainState := retainStateP(depth)
|
||||
|
||||
if hdr.Height > 0 {
|
||||
if err := push(hdr.Messages); err != nil {
|
||||
return err
|
||||
}
|
||||
if retainState {
|
||||
if err := push(hdr.ParentMessageReceipts); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if retainState || hdr.Height == 0 {
|
||||
if err := push(hdr.ParentStateRoot); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if hdr.Height > 0 {
|
||||
toWalk = append(toWalk, hdr.Parents...)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
for len(toWalk) > 0 {
|
||||
// walking can take a while, so check this with every opportunity
|
||||
if err := s.checkClosing(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-errch:
|
||||
return err
|
||||
default:
|
||||
}
|
||||
|
||||
walking := toWalk
|
||||
toWalk = nil
|
||||
for _, c := range walking {
|
||||
if err := walkBlock(c); err != nil {
|
||||
return xerrors.Errorf("error walking block (cid: %s): %w", c, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
once.Do(func() { close(workch) })
|
||||
wg.Wait()
|
||||
select {
|
||||
case err := <-errch:
|
||||
return err
|
||||
default:
|
||||
}
|
||||
|
||||
log.Infow("chain walk done", "walked", walkCnt)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// like walkObject but treats missing references laxly; faster version of walkObjectIncomplete
|
||||
// without an occurs check.
|
||||
func (s *SplitStore) walkObjectLax(c cid.Cid, f func(cid.Cid) error) error {
|
||||
if err := f(c); err != nil {
|
||||
if err == errStopWalk {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if c.Prefix().Codec != cid.DagCBOR {
|
||||
return nil
|
||||
}
|
||||
|
||||
// check this before recursing
|
||||
if err := s.checkClosing(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var links []cid.Cid
|
||||
err := s.view(c, func(data []byte) error {
|
||||
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
|
||||
links = append(links, c)
|
||||
})
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if ipld.IsNotFound(err) { // not a problem for deep walks
|
||||
return nil
|
||||
}
|
||||
|
||||
return xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err)
|
||||
}
|
||||
|
||||
for _, c := range links {
|
||||
err := s.walkObjectLax(c, f)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error walking link (cid: %s): %w", c, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -83,7 +83,7 @@ func (s *SplitStore) reifyWorker(workch chan cid.Cid) {
|
||||
}
|
||||
|
||||
func (s *SplitStore) doReify(c cid.Cid) {
|
||||
var toreify, totrack, toforget []cid.Cid
|
||||
var toreify, toforget []cid.Cid
|
||||
|
||||
defer func() {
|
||||
s.reifyMx.Lock()
|
||||
@ -92,9 +92,6 @@ func (s *SplitStore) doReify(c cid.Cid) {
|
||||
for _, c := range toreify {
|
||||
delete(s.reifyInProgress, c)
|
||||
}
|
||||
for _, c := range totrack {
|
||||
delete(s.reifyInProgress, c)
|
||||
}
|
||||
for _, c := range toforget {
|
||||
delete(s.reifyInProgress, c)
|
||||
}
|
||||
@ -131,19 +128,10 @@ func (s *SplitStore) doReify(c cid.Cid) {
|
||||
return xerrors.Errorf("error checking hotstore: %w", err)
|
||||
}
|
||||
|
||||
// All reified blocks are tracked at reification start
|
||||
if has {
|
||||
if s.txnMarkSet != nil {
|
||||
hasMark, err := s.txnMarkSet.Has(c)
|
||||
if err != nil {
|
||||
log.Warnf("error checking markset: %s", err)
|
||||
} else if hasMark {
|
||||
toforget = append(toforget, c)
|
||||
return errStopWalk
|
||||
}
|
||||
} else {
|
||||
totrack = append(totrack, c)
|
||||
return errStopWalk
|
||||
}
|
||||
toforget = append(toforget, c)
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
toreify = append(toreify, c)
|
||||
@ -155,7 +143,7 @@ func (s *SplitStore) doReify(c cid.Cid) {
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if xerrors.Is(err, errReifyLimit) {
|
||||
if errors.Is(err, errReifyLimit) {
|
||||
log.Debug("reification aborted; reify limit reached")
|
||||
return
|
||||
}
|
||||
@ -190,24 +178,4 @@ func (s *SplitStore) doReify(c cid.Cid) {
|
||||
}
|
||||
}
|
||||
|
||||
if s.txnMarkSet != nil {
|
||||
if len(toreify) > 0 {
|
||||
if err := s.txnMarkSet.MarkMany(toreify); err != nil {
|
||||
log.Warnf("error marking reified objects: %s", err)
|
||||
}
|
||||
}
|
||||
if len(totrack) > 0 {
|
||||
if err := s.txnMarkSet.MarkMany(totrack); err != nil {
|
||||
log.Warnf("error marking tracked objects: %s", err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// if txnActive is false these are noops
|
||||
if len(toreify) > 0 {
|
||||
s.trackTxnRefMany(toreify)
|
||||
}
|
||||
if len(totrack) > 0 {
|
||||
s.trackTxnRefMany(totrack)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,14 +21,14 @@ var (
|
||||
WarmupBoundary = build.Finality
|
||||
)
|
||||
|
||||
// warmup acuiqres the compaction lock and spawns a goroutine to warm up the hotstore;
|
||||
// warmup acquires the compaction lock and spawns a goroutine to warm up the hotstore;
|
||||
// this is necessary when we sync from a snapshot or when we enable the splitstore
|
||||
// on top of an existing blockstore (which becomes the coldstore).
|
||||
func (s *SplitStore) warmup(curTs *types.TipSet) error {
|
||||
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
|
||||
return xerrors.Errorf("error locking compaction")
|
||||
}
|
||||
|
||||
s.compactType = warmup
|
||||
go func() {
|
||||
defer atomic.StoreInt32(&s.compacting, 0)
|
||||
|
||||
|
@ -176,11 +176,12 @@ func (t *TimedCacheBlockstore) AllKeysChan(_ context.Context) (<-chan cid.Cid, e
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
ch := make(chan cid.Cid, len(t.active)+len(t.inactive))
|
||||
for c := range t.active {
|
||||
ch <- c
|
||||
for _, b := range t.active {
|
||||
ch <- b.Cid()
|
||||
}
|
||||
for c := range t.inactive {
|
||||
if _, ok := t.active[c]; ok {
|
||||
for _, b := range t.inactive {
|
||||
c := b.Cid()
|
||||
if _, ok := t.active[string(c.Hash())]; ok {
|
||||
continue
|
||||
}
|
||||
ch <- c
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
46
cli/chain.go
46
cli/chain.go
@ -1462,3 +1462,49 @@ func createExportFile(app *cli.App, path string) (io.WriteCloser, error) {
|
||||
}
|
||||
return fi, nil
|
||||
}
|
||||
|
||||
var ChainPruneCmd = &cli.Command{
|
||||
Name: "prune",
|
||||
Usage: "prune the stored chain state and perform garbage collection",
|
||||
Flags: []cli.Flag{
|
||||
&cli.BoolFlag{
|
||||
Name: "online-gc",
|
||||
Value: false,
|
||||
Usage: "use online gc for garbage collecting the coldstore",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "moving-gc",
|
||||
Value: false,
|
||||
Usage: "use moving gc for garbage collecting the coldstore",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "move-to",
|
||||
Value: "",
|
||||
Usage: "specify new path for coldstore during moving gc",
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "retention",
|
||||
Value: -1,
|
||||
Usage: "specify state retention policy",
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := GetFullNodeAPIV1(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
ctx := ReqContext(cctx)
|
||||
|
||||
opts := lapi.PruneOpts{}
|
||||
if cctx.Bool("online-gc") {
|
||||
opts.MovingGC = false
|
||||
}
|
||||
if cctx.Bool("moving-gc") {
|
||||
opts.MovingGC = true
|
||||
}
|
||||
opts.RetainState = int64(cctx.Int("retention"))
|
||||
|
||||
return api.ChainPrune(ctx, opts)
|
||||
},
|
||||
}
|
||||
|
@ -28,6 +28,7 @@
|
||||
* [ChainHasObj](#ChainHasObj)
|
||||
* [ChainHead](#ChainHead)
|
||||
* [ChainNotify](#ChainNotify)
|
||||
* [ChainPrune](#ChainPrune)
|
||||
* [ChainPutObj](#ChainPutObj)
|
||||
* [ChainReadObj](#ChainReadObj)
|
||||
* [ChainSetHead](#ChainSetHead)
|
||||
@ -962,6 +963,25 @@ Response:
|
||||
]
|
||||
```
|
||||
|
||||
### ChainPrune
|
||||
ChainPrune prunes the stored chain state and garbage collects; only supported if you
|
||||
are using the splitstore
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
{
|
||||
"MovingGC": true,
|
||||
"RetainState": 9
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### ChainPutObj
|
||||
ChainPutObj puts a given object into the block store
|
||||
|
||||
|
@ -29,15 +29,19 @@ type BlockMiner struct {
|
||||
miner *TestMiner
|
||||
|
||||
nextNulls int64
|
||||
pause chan struct{}
|
||||
unpause chan struct{}
|
||||
wg sync.WaitGroup
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func NewBlockMiner(t *testing.T, miner *TestMiner) *BlockMiner {
|
||||
return &BlockMiner{
|
||||
t: t,
|
||||
miner: miner,
|
||||
cancel: func() {},
|
||||
t: t,
|
||||
miner: miner,
|
||||
cancel: func() {},
|
||||
unpause: make(chan struct{}),
|
||||
pause: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
@ -255,6 +259,18 @@ func (bm *BlockMiner) MineBlocks(ctx context.Context, blocktime time.Duration) {
|
||||
defer bm.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-bm.pause:
|
||||
select {
|
||||
case <-bm.unpause:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(blocktime):
|
||||
case <-ctx.Done():
|
||||
@ -283,6 +299,16 @@ func (bm *BlockMiner) InjectNulls(rounds abi.ChainEpoch) {
|
||||
atomic.AddInt64(&bm.nextNulls, int64(rounds))
|
||||
}
|
||||
|
||||
// Pause compels the miner to wait for a signal to restart
|
||||
func (bm *BlockMiner) Pause() {
|
||||
bm.pause <- struct{}{}
|
||||
}
|
||||
|
||||
// Restart continues mining after a pause. This will hang if called before pause
|
||||
func (bm *BlockMiner) Restart() {
|
||||
bm.unpause <- struct{}{}
|
||||
}
|
||||
|
||||
func (bm *BlockMiner) MineUntilBlock(ctx context.Context, fn *TestFullNode, cb func(abi.ChainEpoch)) {
|
||||
for i := 0; i < 1000; i++ {
|
||||
var (
|
||||
@ -335,4 +361,12 @@ func (bm *BlockMiner) Stop() {
|
||||
bm.t.Log("shutting down mining")
|
||||
bm.cancel()
|
||||
bm.wg.Wait()
|
||||
if bm.unpause != nil {
|
||||
close(bm.unpause)
|
||||
bm.unpause = nil
|
||||
}
|
||||
if bm.pause != nil {
|
||||
close(bm.pause)
|
||||
bm.pause = nil
|
||||
}
|
||||
}
|
||||
|
@ -341,7 +341,43 @@ func (n *Ensemble) Start() *Ensemble {
|
||||
|
||||
// Create all inactive full nodes.
|
||||
for i, full := range n.inactive.fullnodes {
|
||||
r := repo.NewMemory(nil)
|
||||
|
||||
var r repo.Repo
|
||||
if !full.options.fsrepo {
|
||||
rmem := repo.NewMemory(nil)
|
||||
n.t.Cleanup(rmem.Cleanup)
|
||||
r = rmem
|
||||
} else {
|
||||
repoPath := n.t.TempDir()
|
||||
rfs, err := repo.NewFS(repoPath)
|
||||
require.NoError(n.t, err)
|
||||
require.NoError(n.t, rfs.Init(repo.FullNode))
|
||||
r = rfs
|
||||
}
|
||||
|
||||
// setup config with options
|
||||
lr, err := r.Lock(repo.FullNode)
|
||||
require.NoError(n.t, err)
|
||||
|
||||
c, err := lr.Config()
|
||||
require.NoError(n.t, err)
|
||||
|
||||
cfg, ok := c.(*config.FullNode)
|
||||
if !ok {
|
||||
n.t.Fatalf("invalid config from repo, got: %T", c)
|
||||
}
|
||||
for _, opt := range full.options.cfgOpts {
|
||||
require.NoError(n.t, opt(cfg))
|
||||
}
|
||||
err = lr.SetConfig(func(raw interface{}) {
|
||||
rcfg := raw.(*config.FullNode)
|
||||
*rcfg = *cfg
|
||||
})
|
||||
require.NoError(n.t, err)
|
||||
|
||||
err = lr.Close()
|
||||
require.NoError(n.t, err)
|
||||
|
||||
opts := []node.Option{
|
||||
node.FullAPI(&full.FullNode, node.Lite(full.options.lite)),
|
||||
node.Base(),
|
||||
@ -396,7 +432,10 @@ func (n *Ensemble) Start() *Ensemble {
|
||||
n.inactive.fullnodes[i] = withRPC
|
||||
}
|
||||
|
||||
n.t.Cleanup(func() { _ = stop(context.Background()) })
|
||||
n.t.Cleanup(func() {
|
||||
_ = stop(context.Background())
|
||||
|
||||
})
|
||||
|
||||
n.active.fullnodes = append(n.active.fullnodes, full)
|
||||
}
|
||||
|
@ -33,6 +33,8 @@ type nodeOpts struct {
|
||||
rpc bool
|
||||
ownerKey *key.Key
|
||||
extraNodeOpts []node.Option
|
||||
cfgOpts []CfgOption
|
||||
fsrepo bool
|
||||
|
||||
subsystems MinerSubsystem
|
||||
mainMiner *TestMiner
|
||||
@ -235,3 +237,43 @@ func WithWorkerStorage(transform func(paths.Store) paths.Store) NodeOpt {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func FsRepo() NodeOpt {
|
||||
return func(opts *nodeOpts) error {
|
||||
opts.fsrepo = true
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithCfgOpt(opt CfgOption) NodeOpt {
|
||||
return func(opts *nodeOpts) error {
|
||||
opts.cfgOpts = append(opts.cfgOpts, opt)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
type CfgOption func(cfg *config.FullNode) error
|
||||
|
||||
func SplitstoreDiscard() NodeOpt {
|
||||
return WithCfgOpt(func(cfg *config.FullNode) error {
|
||||
//cfg.Chainstore.Splitstore.HotStoreType = "badger" // default
|
||||
//cfg.Chainstore.Splitstore.MarkSetType = "badger" // default
|
||||
//cfg.Chainstore.Splitstore.HotStoreMessageRetention = 0 // default
|
||||
cfg.Chainstore.EnableSplitstore = true
|
||||
cfg.Chainstore.Splitstore.HotStoreFullGCFrequency = 0 // turn off full gc
|
||||
cfg.Chainstore.Splitstore.ColdStoreType = "discard" // no cold store
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func SplitstoreUniversal() NodeOpt {
|
||||
return WithCfgOpt(func(cfg *config.FullNode) error {
|
||||
//cfg.Chainstore.Splitstore.HotStoreType = "badger" // default
|
||||
//cfg.Chainstore.Splitstore.MarkSetType = "badger" // default
|
||||
//cfg.Chainstore.Splitstore.HotStoreMessageRetention = 0 // default
|
||||
cfg.Chainstore.EnableSplitstore = true
|
||||
cfg.Chainstore.Splitstore.HotStoreFullGCFrequency = 0 // turn off full gc
|
||||
cfg.Chainstore.Splitstore.ColdStoreType = "universal" // universal bs is coldstore
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
354
itests/splitstore_test.go
Normal file
354
itests/splitstore_test.go
Normal file
@ -0,0 +1,354 @@
|
||||
//stm: #integration
|
||||
package itests
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
"github.com/filecoin-project/go-state-types/builtin"
|
||||
miner8 "github.com/filecoin-project/go-state-types/builtin/v8/miner"
|
||||
"github.com/filecoin-project/go-state-types/exitcode"
|
||||
miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner"
|
||||
power6 "github.com/filecoin-project/specs-actors/v6/actors/builtin/power"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
lapi "github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/blockstore/splitstore"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/actors"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/power"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/itests/kit"
|
||||
)
|
||||
|
||||
// Startup a node with hotstore and discard coldstore. Compact once and return
|
||||
func TestHotstoreCompactsOnce(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
// disable sync checking because efficient itests require that the node is out of sync : /
|
||||
splitstore.CheckSyncGap = false
|
||||
opts := []interface{}{kit.MockProofs(), kit.SplitstoreDiscard()}
|
||||
full, genesisMiner, ens := kit.EnsembleMinimal(t, opts...)
|
||||
bm := ens.InterconnectAll().BeginMining(4 * time.Millisecond)[0]
|
||||
_ = full
|
||||
_ = genesisMiner
|
||||
_ = bm
|
||||
|
||||
waitForCompaction(ctx, t, 1, full)
|
||||
require.NoError(t, genesisMiner.Stop(ctx))
|
||||
}
|
||||
|
||||
// create some unreachable state
|
||||
// and check that compaction carries it away
|
||||
func TestHotstoreCompactCleansGarbage(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
// disable sync checking because efficient itests require that the node is out of sync : /
|
||||
splitstore.CheckSyncGap = false
|
||||
opts := []interface{}{kit.MockProofs(), kit.SplitstoreDiscard()}
|
||||
full, genesisMiner, ens := kit.EnsembleMinimal(t, opts...)
|
||||
bm := ens.InterconnectAll().BeginMining(4 * time.Millisecond)[0]
|
||||
_ = full
|
||||
_ = genesisMiner
|
||||
|
||||
// create garbage
|
||||
g := NewGarbager(ctx, t, full)
|
||||
garbage, e := g.Drop(ctx)
|
||||
|
||||
// calculate next compaction where we should actually see cleanup
|
||||
|
||||
// pause, check for compacting and get compaction info
|
||||
// we do this to remove the (very unlikely) race where compaction index
|
||||
// and compaction epoch are in the middle of update, or a whole compaction
|
||||
// runs between the two
|
||||
for {
|
||||
bm.Pause()
|
||||
if splitStoreCompacting(ctx, t, full) {
|
||||
bm.Restart()
|
||||
time.Sleep(3 * time.Second)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
lastCompactionEpoch := splitStoreBaseEpoch(ctx, t, full)
|
||||
garbageCompactionIndex := splitStoreCompactionIndex(ctx, t, full) + 1
|
||||
boundary := lastCompactionEpoch + splitstore.CompactionThreshold - splitstore.CompactionBoundary
|
||||
|
||||
for e > boundary {
|
||||
boundary += splitstore.CompactionThreshold - splitstore.CompactionBoundary
|
||||
garbageCompactionIndex++
|
||||
}
|
||||
bm.Restart()
|
||||
|
||||
// wait for compaction to occur
|
||||
waitForCompaction(ctx, t, garbageCompactionIndex, full)
|
||||
|
||||
// check that garbage is cleaned up
|
||||
assert.False(t, g.Exists(ctx, garbage), "Garbage still exists in blockstore")
|
||||
}
|
||||
|
||||
// Create unreachable state
|
||||
// Check that it moves to coldstore
|
||||
// Prune coldstore and check that it is deleted
|
||||
func TestColdStorePrune(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
// disable sync checking because efficient itests require that the node is out of sync : /
|
||||
splitstore.CheckSyncGap = false
|
||||
opts := []interface{}{kit.MockProofs(), kit.SplitstoreUniversal(), kit.FsRepo()}
|
||||
full, genesisMiner, ens := kit.EnsembleMinimal(t, opts...)
|
||||
bm := ens.InterconnectAll().BeginMining(4 * time.Millisecond)[0]
|
||||
_ = full
|
||||
_ = genesisMiner
|
||||
|
||||
// create garbage
|
||||
g := NewGarbager(ctx, t, full)
|
||||
garbage, e := g.Drop(ctx)
|
||||
assert.True(g.t, g.Exists(ctx, garbage), "Garbage not found in splitstore")
|
||||
|
||||
// calculate next compaction where we should actually see cleanup
|
||||
|
||||
// pause, check for compacting and get compaction info
|
||||
// we do this to remove the (very unlikely) race where compaction index
|
||||
// and compaction epoch are in the middle of update, or a whole compaction
|
||||
// runs between the two
|
||||
for {
|
||||
bm.Pause()
|
||||
if splitStoreCompacting(ctx, t, full) {
|
||||
bm.Restart()
|
||||
time.Sleep(3 * time.Second)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
lastCompactionEpoch := splitStoreBaseEpoch(ctx, t, full)
|
||||
garbageCompactionIndex := splitStoreCompactionIndex(ctx, t, full) + 1
|
||||
boundary := lastCompactionEpoch + splitstore.CompactionThreshold - splitstore.CompactionBoundary
|
||||
|
||||
for e > boundary {
|
||||
boundary += splitstore.CompactionThreshold - splitstore.CompactionBoundary
|
||||
garbageCompactionIndex++
|
||||
}
|
||||
bm.Restart()
|
||||
|
||||
// wait for compaction to occur
|
||||
waitForCompaction(ctx, t, garbageCompactionIndex, full)
|
||||
|
||||
bm.Pause()
|
||||
|
||||
// This data should now be moved to the coldstore.
|
||||
// Access it without hotview to keep it there while checking that it still exists
|
||||
// Only state compute uses hot view so garbager Exists backed by ChainReadObj is all good
|
||||
assert.True(g.t, g.Exists(ctx, garbage), "Garbage not found in splitstore")
|
||||
bm.Restart()
|
||||
|
||||
// wait for compaction to finsih and pause to make sure it doesn't start to avoid racing
|
||||
for {
|
||||
bm.Pause()
|
||||
if splitStoreCompacting(ctx, t, full) {
|
||||
bm.Restart()
|
||||
time.Sleep(1 * time.Second)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
pruneOpts := api.PruneOpts{RetainState: int64(0), MovingGC: false}
|
||||
require.NoError(t, full.ChainPrune(ctx, pruneOpts))
|
||||
bm.Restart()
|
||||
waitForPrune(ctx, t, 1, full)
|
||||
assert.False(g.t, g.Exists(ctx, garbage), "Garbage should be removed from cold store after prune but it's still there")
|
||||
}
|
||||
|
||||
func waitForCompaction(ctx context.Context, t *testing.T, cIdx int64, n *kit.TestFullNode) {
|
||||
for {
|
||||
if splitStoreCompactionIndex(ctx, t, n) >= cIdx {
|
||||
break
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func waitForPrune(ctx context.Context, t *testing.T, pIdx int64, n *kit.TestFullNode) {
|
||||
for {
|
||||
if splitStorePruneIndex(ctx, t, n) >= pIdx {
|
||||
break
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func splitStoreCompacting(ctx context.Context, t *testing.T, n *kit.TestFullNode) bool {
|
||||
info, err := n.ChainBlockstoreInfo(ctx)
|
||||
require.NoError(t, err)
|
||||
compactingRaw, ok := info["compacting"]
|
||||
require.True(t, ok, "compactions not on blockstore info")
|
||||
compacting, ok := compactingRaw.(bool)
|
||||
require.True(t, ok, "compacting key on blockstore info wrong type")
|
||||
return compacting
|
||||
}
|
||||
|
||||
func splitStoreBaseEpoch(ctx context.Context, t *testing.T, n *kit.TestFullNode) abi.ChainEpoch {
|
||||
info, err := n.ChainBlockstoreInfo(ctx)
|
||||
require.NoError(t, err)
|
||||
baseRaw, ok := info["base epoch"]
|
||||
require.True(t, ok, "'base epoch' not on blockstore info")
|
||||
base, ok := baseRaw.(abi.ChainEpoch)
|
||||
require.True(t, ok, "base epoch key on blockstore info wrong type")
|
||||
return base
|
||||
}
|
||||
|
||||
func splitStoreCompactionIndex(ctx context.Context, t *testing.T, n *kit.TestFullNode) int64 {
|
||||
info, err := n.ChainBlockstoreInfo(ctx)
|
||||
require.NoError(t, err)
|
||||
compact, ok := info["compactions"]
|
||||
require.True(t, ok, "compactions not on blockstore info")
|
||||
compactionIndex, ok := compact.(int64)
|
||||
require.True(t, ok, "compaction key on blockstore info wrong type")
|
||||
return compactionIndex
|
||||
}
|
||||
|
||||
func splitStorePruneIndex(ctx context.Context, t *testing.T, n *kit.TestFullNode) int64 {
|
||||
info, err := n.ChainBlockstoreInfo(ctx)
|
||||
require.NoError(t, err)
|
||||
prune, ok := info["prunes"]
|
||||
require.True(t, ok, "prunes not on blockstore info")
|
||||
pruneIndex, ok := prune.(int64)
|
||||
require.True(t, ok, "prune key on blockstore info wrong type")
|
||||
return pruneIndex
|
||||
}
|
||||
|
||||
// Create on chain unreachable garbage for a network to exercise splitstore
|
||||
// one garbage cid created at a time
|
||||
//
|
||||
// It works by rewriting an internally maintained miner actor's peer ID
|
||||
type Garbager struct {
|
||||
t *testing.T
|
||||
node *kit.TestFullNode
|
||||
latest trashID
|
||||
|
||||
// internal tracking
|
||||
maddr4Data address.Address
|
||||
}
|
||||
|
||||
type trashID uint8
|
||||
|
||||
func NewGarbager(ctx context.Context, t *testing.T, n *kit.TestFullNode) *Garbager {
|
||||
// create miner actor for writing garbage
|
||||
|
||||
g := &Garbager{
|
||||
t: t,
|
||||
node: n,
|
||||
latest: 0,
|
||||
maddr4Data: address.Undef,
|
||||
}
|
||||
g.createMiner(ctx)
|
||||
g.newPeerID(ctx)
|
||||
return g
|
||||
}
|
||||
|
||||
// drop returns the cid referencing the dropped garbage and the chain epoch of the drop
|
||||
func (g *Garbager) Drop(ctx context.Context) (cid.Cid, abi.ChainEpoch) {
|
||||
// record existing with mInfoCidAtEpoch
|
||||
c := g.mInfoCid(ctx)
|
||||
|
||||
// update trashID and create newPeerID, dropping miner info cid c in the process
|
||||
// wait for message and return the chain height that the drop occurred at
|
||||
g.latest++
|
||||
return c, g.newPeerID(ctx)
|
||||
}
|
||||
|
||||
// exists checks whether the cid is reachable through the node
|
||||
func (g *Garbager) Exists(ctx context.Context, c cid.Cid) bool {
|
||||
// check chain get / blockstore get
|
||||
_, err := g.node.ChainReadObj(ctx, c)
|
||||
if ipld.IsNotFound(err) {
|
||||
return false
|
||||
} else if err != nil {
|
||||
g.t.Fatalf("ChainReadObj failure on existence check: %s", err)
|
||||
} else {
|
||||
return true
|
||||
}
|
||||
|
||||
g.t.Fatal("unreachable")
|
||||
return false
|
||||
}
|
||||
|
||||
func (g *Garbager) newPeerID(ctx context.Context) abi.ChainEpoch {
|
||||
dataStr := fmt.Sprintf("Garbager-Data-%d", g.latest)
|
||||
dataID := []byte(dataStr)
|
||||
params, err := actors.SerializeParams(&miner2.ChangePeerIDParams{NewID: dataID})
|
||||
require.NoError(g.t, err)
|
||||
|
||||
msg := &types.Message{
|
||||
To: g.maddr4Data,
|
||||
From: g.node.DefaultKey.Address,
|
||||
Method: builtin.MethodsMiner.ChangePeerID,
|
||||
Params: params,
|
||||
Value: types.NewInt(0),
|
||||
}
|
||||
|
||||
signed, err2 := g.node.MpoolPushMessage(ctx, msg, nil)
|
||||
require.NoError(g.t, err2)
|
||||
|
||||
mw, err2 := g.node.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, api.LookbackNoLimit, true)
|
||||
require.NoError(g.t, err2)
|
||||
require.Equal(g.t, exitcode.Ok, mw.Receipt.ExitCode)
|
||||
return mw.Height
|
||||
}
|
||||
|
||||
func (g *Garbager) mInfoCid(ctx context.Context) cid.Cid {
|
||||
ts, err := g.node.ChainHead(ctx)
|
||||
require.NoError(g.t, err)
|
||||
|
||||
act, err := g.node.StateGetActor(ctx, g.maddr4Data, ts.Key())
|
||||
require.NoError(g.t, err)
|
||||
raw, err := g.node.ChainReadObj(ctx, act.Head)
|
||||
require.NoError(g.t, err)
|
||||
var mSt miner8.State
|
||||
require.NoError(g.t, mSt.UnmarshalCBOR(bytes.NewReader(raw)))
|
||||
|
||||
// return infoCid
|
||||
return mSt.Info
|
||||
}
|
||||
|
||||
func (g *Garbager) createMiner(ctx context.Context) {
|
||||
require.True(g.t, g.maddr4Data == address.Undef, "garbager miner actor already created")
|
||||
owner, err := g.node.WalletDefaultAddress(ctx)
|
||||
require.NoError(g.t, err)
|
||||
worker := owner
|
||||
|
||||
params, err := actors.SerializeParams(&power6.CreateMinerParams{
|
||||
Owner: owner,
|
||||
Worker: worker,
|
||||
WindowPoStProofType: abi.RegisteredPoStProof_StackedDrgWindow32GiBV1,
|
||||
})
|
||||
require.NoError(g.t, err)
|
||||
|
||||
createStorageMinerMsg := &types.Message{
|
||||
To: power.Address,
|
||||
From: worker,
|
||||
Value: big.Zero(),
|
||||
|
||||
Method: power.Methods.CreateMiner,
|
||||
Params: params,
|
||||
}
|
||||
|
||||
signed, err := g.node.MpoolPushMessage(ctx, createStorageMinerMsg, nil)
|
||||
require.NoError(g.t, err)
|
||||
mw, err := g.node.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, lapi.LookbackNoLimit, true)
|
||||
require.NoError(g.t, err)
|
||||
require.True(g.t, mw.Receipt.ExitCode == 0, "garbager's internal create miner message failed")
|
||||
|
||||
var retval power6.CreateMinerReturn
|
||||
require.NoError(g.t, retval.UnmarshalCBOR(bytes.NewReader(mw.Receipt.Return)))
|
||||
g.maddr4Data = retval.IDAddress
|
||||
}
|
@ -653,3 +653,14 @@ func (a *ChainAPI) ChainBlockstoreInfo(ctx context.Context) (map[string]interfac
|
||||
|
||||
return info.Info(), nil
|
||||
}
|
||||
|
||||
func (a *ChainAPI) ChainPrune(ctx context.Context, opts api.PruneOpts) error {
|
||||
pruner, ok := a.BaseBlockstore.(interface {
|
||||
PruneChain(opts api.PruneOpts) error
|
||||
})
|
||||
if !ok {
|
||||
return xerrors.Errorf("base blockstore does not support pruning (%T)", a.BaseBlockstore)
|
||||
}
|
||||
|
||||
return pruner.PruneChain(opts)
|
||||
}
|
||||
|
@ -271,7 +271,11 @@ func (lmem *lockedMemRepo) Blockstore(ctx context.Context, domain BlockstoreDoma
|
||||
}
|
||||
|
||||
func (lmem *lockedMemRepo) SplitstorePath() (string, error) {
|
||||
return ioutil.TempDir("", "splitstore.*")
|
||||
splitstorePath := filepath.Join(lmem.Path(), "splitstore")
|
||||
if err := os.MkdirAll(splitstorePath, 0755); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return splitstorePath, nil
|
||||
}
|
||||
|
||||
func (lmem *lockedMemRepo) ListDatastores(ns string) ([]int64, error) {
|
||||
|
Loading…
Reference in New Issue
Block a user