diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 7477fb20d..25b906750 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -530,23 +530,32 @@ func (s *SplitStore) warmup(curTs *types.TipSet) { // Compaction/GC Algorithm func (s *SplitStore) compact(curTs *types.TipSet) { + var err error if s.markSetSize == 0 { start := time.Now() log.Info("estimating mark set size") - s.estimateMarkSetSize(curTs) + err = s.estimateMarkSetSize(curTs) + if err != nil { + log.Errorf("error estimating mark set size: %s; aborting compaction", err) + return + } log.Infow("estimating mark set size done", "took", time.Since(start), "size", s.markSetSize) } else { log.Infow("current mark set size estimate", "size", s.markSetSize) } if s.fullCompaction { - s.compactFull(curTs) + err = s.compactFull(curTs) } else { - s.compactSimple(curTs) + err = s.compactSimple(curTs) + } + + if err != nil { + log.Errorf("COMPACTION ERROR: %s", err) } } -func (s *SplitStore) estimateMarkSetSize(curTs *types.TipSet) { +func (s *SplitStore) estimateMarkSetSize(curTs *types.TipSet) error { var count int64 err := s.chain.WalkSnapshot(context.Background(), curTs, 1, s.skipOldMsgs, s.skipMsgReceipts, func(cid cid.Cid) error { @@ -555,14 +564,14 @@ func (s *SplitStore) estimateMarkSetSize(curTs *types.TipSet) { }) if err != nil { - // TODO do something better here - panic(err) + return err } s.markSetSize = count + count>>2 // overestimate a bit + return nil } -func (s *SplitStore) compactSimple(curTs *types.TipSet) { +func (s *SplitStore) compactSimple(curTs *types.TipSet) error { coldEpoch := s.baseEpoch + CompactionCold currentEpoch := curTs.Height() boundaryEpoch := currentEpoch - CompactionBoundary @@ -571,19 +580,17 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) { coldSet, err := s.env.Create("cold", s.markSetSize) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error creating mark set: %w", err) } defer coldSet.Close() //nolint:errcheck // 1. mark reachable cold objects by looking at the objects reachable only from the cold epoch - log.Infow("marking reachable cold objects", "boundaryEpoch", boundaryEpoch) + log.Infow("marking reachable cold blocks", "boundaryEpoch", boundaryEpoch) startMark := time.Now() boundaryTs, err := s.chain.GetTipsetByHeight(context.Background(), boundaryEpoch, curTs, true) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error getting tipset at boundary epoch: %w", err) } var count int64 @@ -594,8 +601,7 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) { }) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error marking cold blocks: %w", err) } if count > s.markSetSize { @@ -640,8 +646,7 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) { }) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error collecting cold objects: %w", err) } if coldCnt > 0 { @@ -658,16 +663,15 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) { // check to see if we are closing first; if that's the case just return if atomic.LoadInt32(&s.closing) == 1 { log.Info("splitstore is closing; aborting compaction") - return + return xerrors.Errorf("compaction aborted") } // 2.2 copy the cold objects to the coldstore - log.Info("moving cold objects to the coldstore") + log.Info("moving cold blocks to the coldstore") startMove := time.Now() err = s.moveColdBlocks(cold) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error moving cold blocks: %w", err) } log.Infow("moving done", "took", time.Since(startMove)) @@ -676,8 +680,7 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) { startPurge := time.Now() err = s.purgeBlocks(cold) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error purging cold blocks: %w", err) } log.Infow("purging cold from hotstore done", "took", time.Since(startPurge)) @@ -686,28 +689,27 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) { log.Info("purging cold objects from tracker") err = s.purgeTracking(cold) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error purging tracking for cold blocks: %w", err) } log.Infow("purging cold from tracker done", "took", time.Since(startPurge)) // we are done; do some housekeeping err = s.tracker.Sync() if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error syncing tracker: %w", err) } err = s.setBaseEpoch(coldEpoch) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error saving base epoch: %w", err) } err = s.ds.Put(markSetSizeKey, int64ToBytes(s.markSetSize)) if err != nil { - log.Errorf("error saving mark set size: %s", err) + return xerrors.Errorf("error saving mark set size: %w", err) } + + return nil } func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error { @@ -782,7 +784,7 @@ func (s *SplitStore) purgeTracking(cids []cid.Cid) error { return s.purgeBatch(cids, s.tracker.DeleteBatch) } -func (s *SplitStore) compactFull(curTs *types.TipSet) { +func (s *SplitStore) compactFull(curTs *types.TipSet) error { currentEpoch := curTs.Height() coldEpoch := s.baseEpoch + CompactionCold boundaryEpoch := currentEpoch - CompactionBoundary @@ -793,27 +795,24 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) { // and one for marking the hot region hotSet, err := s.env.Create("hot", s.markSetSize) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error creating hot mark set: %w", err) } defer hotSet.Close() //nolint:errcheck coldSet, err := s.env.Create("cold", s.markSetSize) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error creating cold mark set: %w", err) } defer coldSet.Close() //nolint:errcheck // Phase 1: marking - log.Info("marking live objects") + log.Info("marking live blocks") startMark := time.Now() // Phase 1a: mark all reachable CIDs in the hot range boundaryTs, err := s.chain.GetTipsetByHeight(context.Background(), boundaryEpoch, curTs, true) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error getting tipset at boundary epoch: %w", err) } count := int64(0) @@ -824,8 +823,7 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) { }) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error marking hot blocks: %w", err) } if count > s.markSetSize { @@ -835,8 +833,7 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) { // Phase 1b: mark all reachable CIDs in the cold range coldTs, err := s.chain.GetTipsetByHeight(context.Background(), coldEpoch, curTs, true) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error getting tipset at cold epoch: %w", err) } count = 0 @@ -847,8 +844,7 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) { }) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error marking cold blocks: %w", err) } if count > s.markSetSize { @@ -921,8 +917,7 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) { }) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error collecting cold objects: %w", err) } if coldCnt > 0 { @@ -942,7 +937,7 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) { // check to see if we are closing first; if that's the case just return if atomic.LoadInt32(&s.closing) == 1 { log.Info("splitstore is closing; aborting compaction") - return + return xerrors.Errorf("compaction aborted") } // 2.2 copy the cold objects to the coldstore @@ -950,8 +945,7 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) { startMove := time.Now() err = s.moveColdBlocks(cold) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error moving cold blocks: %w", err) } log.Infow("moving done", "took", time.Since(startMove)) @@ -960,8 +954,7 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) { startPurge := time.Now() err = s.purgeBlocks(cold) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error purging cold blocks: %w", err) } log.Infow("purging cold from hotstore done", "took", time.Since(startPurge)) @@ -970,8 +963,7 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) { log.Info("purging cold objects from tracker") err = s.purgeTracking(cold) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error purging tracking for cold blocks: %w", err) } log.Infow("purging cold from tracker done", "took", time.Since(startPurge)) @@ -980,8 +972,7 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) { log.Info("deleting dead objects") err = s.purgeBlocks(dead) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error purging dead blocks: %w", err) } // remove the tracker tracking @@ -989,8 +980,7 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) { log.Info("purging dead objects from tracker") err = s.purgeTracking(dead) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error purging tracking for dead blocks: %w", err) } log.Infow("purging dead from tracker done", "took", time.Since(startPurge)) } @@ -998,20 +988,20 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) { // we are done; do some housekeeping err = s.tracker.Sync() if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error syncing tracker: %w", err) } err = s.setBaseEpoch(coldEpoch) if err != nil { - // TODO do something better here - panic(err) + return xerrors.Errorf("error saving base epoch: %w", err) } err = s.ds.Put(markSetSizeKey, int64ToBytes(s.markSetSize)) if err != nil { - log.Errorf("error saving mark set size: %s", err) + return xerrors.Errorf("error saving mark set size: %w", err) } + + return nil } func (s *SplitStore) setBaseEpoch(epoch abi.ChainEpoch) error {