From c762536dcbcb67dd7aa6929d66ac501c5174fb12 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 2 Mar 2021 11:20:39 +0200 Subject: [PATCH] deduplicate code --- blockstore/splitstore/splitstore.go | 217 +++++++++++----------------- 1 file changed, 88 insertions(+), 129 deletions(-) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 208701d25..9764f508d 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -596,75 +596,29 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) { // 2.2 copy the cold objects to the coldstore log.Info("moving cold objects to the coldstore") startMove := time.Now() - - batch := make([]blocks.Block, 0, batchSize) - - for cid := range cold { - blk, err := s.hot.Get(cid) - if err != nil { - if err == dstore.ErrNotFound { - // this can happen if the node is killed after we have deleted the block from the hotstore - // but before we have deleted it from the tracker; just delete the tracker. - err = s.tracker.Delete(cid) - if err != nil { - log.Errorf("error deleting cid %s from tracker: %s", cid, err) - // TODO do something better here -- just continue? - panic(err) - } - } else { - log.Errorf("error retrieving tracked block %s from hotstore: %s", cid, err) - // TODO do something better here -- just continue? - panic(err) - } - - continue - } - - batch = append(batch, blk) - if len(batch) == batchSize { - err = s.cold.PutMany(batch) - if err != nil { - log.Errorf("error putting cold batch to coldstore: %s", err) - // TODO do something better here -- just continue? - panic(err) - } - batch = batch[:0] - } - } - - if len(batch) > 0 { - err = s.cold.PutMany(batch) - if err != nil { - log.Errorf("error putting cold batch to coldstore: %s", err) - // TODO do something better here -- just continue? - panic(err) - } + err = s.moveColdBlocks(cold) + if err != nil { + // TODO do something better here + panic(err) } log.Infow("moving done", "took", time.Since(startMove)) // 2.3 delete cold objects from the hotstore - // TODO we really want batching for this! log.Info("purging cold objects from the hotstore") startPurge := time.Now() - for cid := range cold { - // delete the object from the hotstore - err = s.hot.DeleteBlock(cid) - if err != nil { - log.Errorf("error deleting block %s from hotstore: %s", cid, err) - // TODO do something better here -- just continue? - panic(err) - } + err = s.purgeBlocks(cold) + if err != nil { + // TODO do something better here + panic(err) } log.Infow("purging cold from hotstore done", "took", time.Since(startPurge)) // 2.4 remove the tracker tracking for cold objects startPurge = time.Now() log.Info("purging cold objects from tracker") - - err = s.tracker.DeleteBatch(cold) + err = s.purgeTracking(cold) if err != nil { - log.Errorf("error purging cold objects from tracker: %s", err) - // TODO do something better here -- just continue? + // TODO do something better here panic(err) } log.Infow("purging cold from tracker done", "took", time.Since(startPurge)) @@ -683,6 +637,68 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) { } } +func (s *SplitStore) moveColdBlocks(cold map[cid.Cid]struct{}) error { + batch := make([]blocks.Block, 0, batchSize) + + for cid := range cold { + blk, err := s.hot.Get(cid) + if err != nil { + if err == dstore.ErrNotFound { + // this can happen if the node is killed after we have deleted the block from the hotstore + // but before we have deleted it from the tracker; just delete the tracker. + err = s.tracker.Delete(cid) + if err != nil { + return xerrors.Errorf("error deleting unreachable cid %s from tracker: %w", cid, err) + } + } else { + return xerrors.Errorf("error retrieving tracked block %s from hotstore: %w", cid, err) + } + + continue + } + + batch = append(batch, blk) + if len(batch) == batchSize { + err = s.cold.PutMany(batch) + if err != nil { + return xerrors.Errorf("error putting batch to coldstore: %w", err) + } + batch = batch[:0] + } + } + + if len(batch) > 0 { + err := s.cold.PutMany(batch) + if err != nil { + return xerrors.Errorf("error putting cold to coldstore: %w", err) + } + } + + return nil +} + +func (s *SplitStore) purgeBlocks(cids map[cid.Cid]struct{}) error { + // TODO batch deletion -- this is very slow with many objects, but we need + // a DeleteBatch method in the blockstore interface + for cid := range cids { + err := s.hot.DeleteBlock(cid) + if err != nil { + return xerrors.Errorf("error deleting block %s from hotstore: %e", cid, err) + } + } + + return nil +} + +func (s *SplitStore) purgeTracking(cids map[cid.Cid]struct{}) error { + err := s.tracker.DeleteBatch(cids) + if err != nil { + return xerrors.Errorf("error deleting batch from tracker: %w", err) + } + + return nil +} + func (s *SplitStore) compactFull(curTs *types.TipSet) { epoch := curTs.Height() coldEpoch := s.baseEpoch + CompactionCold @@ -825,74 +841,29 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) { // 2.2 copy the cold objects to the coldstore log.Info("moving cold objects to the coldstore") startMove := time.Now() - - batch := make([]blocks.Block, 0, batchSize) - for cid := range cold { - blk, err := s.hot.Get(cid) - if err != nil { - if err == dstore.ErrNotFound { - // this can happen if the node is killed after we have deleted the block from the hotstore - // but before we have deleted it from the tracker; just delete the tracker. - err = s.tracker.Delete(cid) - if err != nil { - log.Errorf("error deleting cid %s from tracker: %s", cid, err) - // TODO do something better here -- just continue? - panic(err) - } - } else { - log.Errorf("error retrieving tracked block %s from hotstore: %s", cid, err) - // TODO do something better here -- just continue? - panic(err) - } - - continue - } - - batch = append(batch, blk) - if len(batch) == batchSize { - err = s.cold.PutMany(batch) - if err != nil { - log.Errorf("error putting cold batch to coldstore: %s", err) - // TODO do something better here -- just continue? - panic(err) - } - batch = batch[:0] - } - } - - if len(batch) > 0 { - err = s.cold.PutMany(batch) - if err != nil { - log.Errorf("error putting cold batch to coldstore: %s", err) - // TODO do something better here -- just continue? - panic(err) - } + err = s.moveColdBlocks(cold) + if err != nil { + // TODO do something better here + panic(err) } log.Infow("moving done", "took", time.Since(startMove)) // 2.3 delete cold objects from the hotstore - // TODO we really want batching for this! log.Info("purging cold objects from the hotstore") startPurge := time.Now() - for cid := range cold { - // delete the object from the hotstore - err = s.hot.DeleteBlock(cid) - if err != nil { - log.Errorf("error deleting block %s from hotstore: %s", cid, err) - // TODO do something better here -- just continue? - panic(err) - } + err = s.purgeBlocks(cold) + if err != nil { + // TODO do something better here + panic(err) } log.Infow("purging cold from hotstore done", "took", time.Since(startPurge)) // 2.4 remove the tracker tracking for cold objects startPurge = time.Now() log.Info("purging cold objects from tracker") - - err = s.tracker.DeleteBatch(cold) + err = s.purgeTracking(cold) if err != nil { - log.Errorf("error purging cold objects from tracker: %s", err) - // TODO do something better here -- just continue? + // TODO do something better here panic(err) } log.Infow("purging cold from tracker done", "took", time.Since(startPurge)) @@ -900,32 +871,20 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) { // 3. if we have dead objects, delete them from the hotstore and remove the tracking if len(dead) > 0 { log.Info("deleting dead objects") - - startPurge = time.Now() - log.Info("purging dead objects from the hotstore") - // TODO we really want batching for this! - for cid := range dead { - // delete the object from the hotstore - err = s.hot.DeleteBlock(cid) - if err != nil { - log.Errorf("error deleting block %s from hotstore: %s", cid, err) - // TODO do something better here -- just continue? - panic(err) - } + err = s.purgeBlocks(dead) + if err != nil { + // TODO do something better here + panic(err) } - log.Infow("purging dead from hotstore done", "took", time.Since(startPurge)) // remove the tracker tracking startPurge := time.Now() log.Info("purging dead objects from tracker") - - err = s.tracker.DeleteBatch(dead) + err = s.purgeTracking(dead) if err != nil { - log.Errorf("error purging dead objects from tracker: %s", err) - // TODO do something better here -- just continue? + // TODO do something better here panic(err) } - log.Infow("purging dead from tracker done", "took", time.Since(startPurge)) }