deduplicate code
This commit is contained in:
parent
5184bc5c40
commit
c762536dcb
@ -596,75 +596,29 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
|||||||
// 2.2 copy the cold objects to the coldstore
|
// 2.2 copy the cold objects to the coldstore
|
||||||
log.Info("moving cold objects to the coldstore")
|
log.Info("moving cold objects to the coldstore")
|
||||||
startMove := time.Now()
|
startMove := time.Now()
|
||||||
|
err = s.moveColdBlocks(cold)
|
||||||
batch := make([]blocks.Block, 0, batchSize)
|
if err != nil {
|
||||||
|
// TODO do something better here
|
||||||
for cid := range cold {
|
panic(err)
|
||||||
blk, err := s.hot.Get(cid)
|
|
||||||
if err != nil {
|
|
||||||
if err == dstore.ErrNotFound {
|
|
||||||
// this can happen if the node is killed after we have deleted the block from the hotstore
|
|
||||||
// but before we have deleted it from the tracker; just delete the tracker.
|
|
||||||
err = s.tracker.Delete(cid)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error deleting cid %s from tracker: %s", cid, err)
|
|
||||||
// TODO do something better here -- just continue?
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Errorf("error retrieving tracked block %s from hotstore: %s", cid, err)
|
|
||||||
// TODO do something better here -- just continue?
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
batch = append(batch, blk)
|
|
||||||
if len(batch) == batchSize {
|
|
||||||
err = s.cold.PutMany(batch)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error putting cold batch to coldstore: %s", err)
|
|
||||||
// TODO do something better here -- just continue?
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
batch = batch[:0]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(batch) > 0 {
|
|
||||||
err = s.cold.PutMany(batch)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error putting cold batch to coldstore: %s", err)
|
|
||||||
// TODO do something better here -- just continue?
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
log.Infow("moving done", "took", time.Since(startMove))
|
log.Infow("moving done", "took", time.Since(startMove))
|
||||||
|
|
||||||
// 2.3 delete cold objects from the hotstore
|
// 2.3 delete cold objects from the hotstore
|
||||||
// TODO we really want batching for this!
|
|
||||||
log.Info("purging cold objects from the hotstore")
|
log.Info("purging cold objects from the hotstore")
|
||||||
startPurge := time.Now()
|
startPurge := time.Now()
|
||||||
for cid := range cold {
|
err = s.purgeBlocks(cold)
|
||||||
// delete the object from the hotstore
|
if err != nil {
|
||||||
err = s.hot.DeleteBlock(cid)
|
// TODO do something better here
|
||||||
if err != nil {
|
panic(err)
|
||||||
log.Errorf("error deleting block %s from hotstore: %s", cid, err)
|
|
||||||
// TODO do something better here -- just continue?
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
log.Infow("purging cold from hotstore done", "took", time.Since(startPurge))
|
log.Infow("purging cold from hotstore done", "took", time.Since(startPurge))
|
||||||
|
|
||||||
// 2.4 remove the tracker tracking for cold objects
|
// 2.4 remove the tracker tracking for cold objects
|
||||||
startPurge = time.Now()
|
startPurge = time.Now()
|
||||||
log.Info("purging cold objects from tracker")
|
log.Info("purging cold objects from tracker")
|
||||||
|
err = s.purgeTracking(cold)
|
||||||
err = s.tracker.DeleteBatch(cold)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("error purging cold objects from tracker: %s", err)
|
// TODO do something better here
|
||||||
// TODO do something better here -- just continue?
|
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
log.Infow("purging cold from tracker done", "took", time.Since(startPurge))
|
log.Infow("purging cold from tracker done", "took", time.Since(startPurge))
|
||||||
@ -683,6 +637,68 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) moveColdBlocks(cold map[cid.Cid]struct{}) error {
|
||||||
|
batch := make([]blocks.Block, 0, batchSize)
|
||||||
|
|
||||||
|
for cid := range cold {
|
||||||
|
blk, err := s.hot.Get(cid)
|
||||||
|
if err != nil {
|
||||||
|
if err == dstore.ErrNotFound {
|
||||||
|
// this can happen if the node is killed after we have deleted the block from the hotstore
|
||||||
|
// but before we have deleted it from the tracker; just delete the tracker.
|
||||||
|
err = s.tracker.Delete(cid)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error deleting unreachable cid %s from tracker: %w", cid, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return xerrors.Errorf("error retrieving tracked block %s from hotstore: %w", cid, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
batch = append(batch, blk)
|
||||||
|
if len(batch) == batchSize {
|
||||||
|
err = s.cold.PutMany(batch)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error putting batch to coldstore: %w", err)
|
||||||
|
}
|
||||||
|
batch = batch[:0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(batch) > 0 {
|
||||||
|
err := s.cold.PutMany(batch)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error putting cold to coldstore: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) purgeBlocks(cids map[cid.Cid]struct{}) error {
|
||||||
|
// TODO batch deletion -- this is very slow with many objects, but we need
|
||||||
|
// a DeleteBatch method in the blockstore interface
|
||||||
|
for cid := range cids {
|
||||||
|
err := s.hot.DeleteBlock(cid)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error deleting block %s from hotstore: %e", cid, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) purgeTracking(cids map[cid.Cid]struct{}) error {
|
||||||
|
err := s.tracker.DeleteBatch(cids)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error deleting batch from tracker: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
||||||
epoch := curTs.Height()
|
epoch := curTs.Height()
|
||||||
coldEpoch := s.baseEpoch + CompactionCold
|
coldEpoch := s.baseEpoch + CompactionCold
|
||||||
@ -825,74 +841,29 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
|||||||
// 2.2 copy the cold objects to the coldstore
|
// 2.2 copy the cold objects to the coldstore
|
||||||
log.Info("moving cold objects to the coldstore")
|
log.Info("moving cold objects to the coldstore")
|
||||||
startMove := time.Now()
|
startMove := time.Now()
|
||||||
|
err = s.moveColdBlocks(cold)
|
||||||
batch := make([]blocks.Block, 0, batchSize)
|
if err != nil {
|
||||||
for cid := range cold {
|
// TODO do something better here
|
||||||
blk, err := s.hot.Get(cid)
|
panic(err)
|
||||||
if err != nil {
|
|
||||||
if err == dstore.ErrNotFound {
|
|
||||||
// this can happen if the node is killed after we have deleted the block from the hotstore
|
|
||||||
// but before we have deleted it from the tracker; just delete the tracker.
|
|
||||||
err = s.tracker.Delete(cid)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error deleting cid %s from tracker: %s", cid, err)
|
|
||||||
// TODO do something better here -- just continue?
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Errorf("error retrieving tracked block %s from hotstore: %s", cid, err)
|
|
||||||
// TODO do something better here -- just continue?
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
batch = append(batch, blk)
|
|
||||||
if len(batch) == batchSize {
|
|
||||||
err = s.cold.PutMany(batch)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error putting cold batch to coldstore: %s", err)
|
|
||||||
// TODO do something better here -- just continue?
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
batch = batch[:0]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(batch) > 0 {
|
|
||||||
err = s.cold.PutMany(batch)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error putting cold batch to coldstore: %s", err)
|
|
||||||
// TODO do something better here -- just continue?
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
log.Infow("moving done", "took", time.Since(startMove))
|
log.Infow("moving done", "took", time.Since(startMove))
|
||||||
|
|
||||||
// 2.3 delete cold objects from the hotstore
|
// 2.3 delete cold objects from the hotstore
|
||||||
// TODO we really want batching for this!
|
|
||||||
log.Info("purging cold objects from the hotstore")
|
log.Info("purging cold objects from the hotstore")
|
||||||
startPurge := time.Now()
|
startPurge := time.Now()
|
||||||
for cid := range cold {
|
err = s.purgeBlocks(cold)
|
||||||
// delete the object from the hotstore
|
if err != nil {
|
||||||
err = s.hot.DeleteBlock(cid)
|
// TODO do something better here
|
||||||
if err != nil {
|
panic(err)
|
||||||
log.Errorf("error deleting block %s from hotstore: %s", cid, err)
|
|
||||||
// TODO do something better here -- just continue?
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
log.Infow("purging cold from hotstore done", "took", time.Since(startPurge))
|
log.Infow("purging cold from hotstore done", "took", time.Since(startPurge))
|
||||||
|
|
||||||
// 2.4 remove the tracker tracking for cold objects
|
// 2.4 remove the tracker tracking for cold objects
|
||||||
startPurge = time.Now()
|
startPurge = time.Now()
|
||||||
log.Info("purging cold objects from tracker")
|
log.Info("purging cold objects from tracker")
|
||||||
|
err = s.purgeTracking(cold)
|
||||||
err = s.tracker.DeleteBatch(cold)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("error purging cold objects from tracker: %s", err)
|
// TODO do something better here
|
||||||
// TODO do something better here -- just continue?
|
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
log.Infow("purging cold from tracker done", "took", time.Since(startPurge))
|
log.Infow("purging cold from tracker done", "took", time.Since(startPurge))
|
||||||
@ -900,32 +871,20 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
|||||||
// 3. if we have dead objects, delete them from the hotstore and remove the tracking
|
// 3. if we have dead objects, delete them from the hotstore and remove the tracking
|
||||||
if len(dead) > 0 {
|
if len(dead) > 0 {
|
||||||
log.Info("deleting dead objects")
|
log.Info("deleting dead objects")
|
||||||
|
err = s.purgeBlocks(dead)
|
||||||
startPurge = time.Now()
|
if err != nil {
|
||||||
log.Info("purging dead objects from the hotstore")
|
// TODO do something better here
|
||||||
// TODO we really want batching for this!
|
panic(err)
|
||||||
for cid := range dead {
|
|
||||||
// delete the object from the hotstore
|
|
||||||
err = s.hot.DeleteBlock(cid)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error deleting block %s from hotstore: %s", cid, err)
|
|
||||||
// TODO do something better here -- just continue?
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
log.Infow("purging dead from hotstore done", "took", time.Since(startPurge))
|
|
||||||
|
|
||||||
// remove the tracker tracking
|
// remove the tracker tracking
|
||||||
startPurge := time.Now()
|
startPurge := time.Now()
|
||||||
log.Info("purging dead objects from tracker")
|
log.Info("purging dead objects from tracker")
|
||||||
|
err = s.purgeTracking(dead)
|
||||||
err = s.tracker.DeleteBatch(dead)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("error purging dead objects from tracker: %s", err)
|
// TODO do something better here
|
||||||
// TODO do something better here -- just continue?
|
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infow("purging dead from tracker done", "took", time.Since(startPurge))
|
log.Infow("purging dead from tracker done", "took", time.Since(startPurge))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user