batch move objects from coldstore to hotstore

This commit is contained in:
vyzo 2021-02-28 10:21:48 +02:00
parent 97abbe1eca
commit 4cc672d0c5

View File

@ -394,6 +394,7 @@ func (s *SplitStore) compactSimple() {
// some stats for logging
var stHot, stCold int
// 2.1 iterate through the snoop and collect unreachable cold objects
err = s.snoop.ForEach(func(cid cid.Cid, wrEpoch abi.ChainEpoch) error {
// is the object stil hot?
if wrEpoch > coldEpoch {
@ -427,8 +428,13 @@ func (s *SplitStore) compactSimple() {
log.Infow("collection done", "took", time.Since(startCollect))
log.Infow("compaction stats", "hot", stHot, "cold", stCold)
// 2.2 copy the cold objects to the coldstore
log.Info("moving cold objects to the coldstore")
startMove := time.Now()
const batchSize = 1024
batch := make([]blocks.Block, 0, batchSize)
for cid := range cold {
blk, err := s.hot.Get(cid)
if err != nil {
@ -450,14 +456,33 @@ func (s *SplitStore) compactSimple() {
continue
}
// put the object in the coldstore
err = s.cold.Put(blk)
batch = append(batch, blk)
if len(batch) == batchSize {
err = s.cold.PutMany(batch)
if err != nil {
log.Errorf("error putting cold batch to coldstore: %s", err)
// TODO do something better here -- just continue?
panic(err)
}
batch = batch[:0]
}
}
if len(batch) > 0 {
err = s.cold.PutMany(batch)
if err != nil {
log.Errorf("error puting block %s to coldstore: %s", cid, err)
log.Errorf("error putting cold batch to coldstore: %s", err)
// TODO do something better here -- just continue?
panic(err)
}
}
log.Infow("moving done", "took", time.Since(startMove))
// 2.3 delete cold objects from the hotstore
// TODO we really want batching for this!
log.Info("purging cold objects from the hotstore")
purgeStart := time.Now()
for cid := range cold {
// delete the object from the hotstore
err = s.hot.DeleteBlock(cid)
if err != nil {
@ -466,10 +491,10 @@ func (s *SplitStore) compactSimple() {
panic(err)
}
}
log.Infow("moving done", "took", time.Since(startMove))
log.Infow("purging cold from hotstore done", "took", time.Since(purgeStart))
// remove the snoop tracking
purgeStart := time.Now()
// 2.4 remove the snoop tracking for cold objects
purgeStart = time.Now()
log.Info("purging cold objects from tracking store")
err = s.snoop.DeleteBatch(cold)
@ -478,8 +503,9 @@ func (s *SplitStore) compactSimple() {
// TODO do something better here -- just continue?
panic(err)
}
log.Infow("purging done", "took", time.Since(purgeStart))
log.Infow("purging cold from tracking store done", "took", time.Since(purgeStart))
// we are done; do some housekeeping
err = s.snoop.Sync()
if err != nil {
// TODO do something better here
@ -566,6 +592,7 @@ func (s *SplitStore) compactFull() {
cold := make(map[cid.Cid]struct{})
dead := make(map[cid.Cid]struct{})
// 2.1 iterate through the snoop and collect cold and dead objects
err = s.snoop.ForEach(func(cid cid.Cid, wrEpoch abi.ChainEpoch) error {
// is the object stil hot?
if wrEpoch > coldEpoch {
@ -622,7 +649,13 @@ func (s *SplitStore) compactFull() {
log.Infow("compaction stats", "hot", stHot, "cold", stCold, "dead", stDead)
// 2.2 copy the cold objects to the coldstore
log.Info("moving cold objects to the coldstore")
startMove := time.Now()
const batchSize = 1024
batch := make([]blocks.Block, 0, batchSize)
for cid := range cold {
blk, err := s.hot.Get(cid)
if err != nil {
@ -644,14 +677,33 @@ func (s *SplitStore) compactFull() {
continue
}
// put the object in the coldstore
err = s.cold.Put(blk)
batch = append(batch, blk)
if len(batch) == batchSize {
err = s.cold.PutMany(batch)
if err != nil {
log.Errorf("error putting cold batch to coldstore: %s", err)
// TODO do something better here -- just continue?
panic(err)
}
batch = batch[:0]
}
}
if len(batch) > 0 {
err = s.cold.PutMany(batch)
if err != nil {
log.Errorf("error puting block %s to coldstore: %s", cid, err)
log.Errorf("error putting cold batch to coldstore: %s", err)
// TODO do something better here -- just continue?
panic(err)
}
}
log.Infow("moving done", "took", time.Since(startMove))
// 2.3 delete cold objects from the hotstore
// TODO we really want batching for this!
log.Info("purging cold objects from the hotstore")
purgeStart := time.Now()
for cid := range cold {
// delete the object from the hotstore
err = s.hot.DeleteBlock(cid)
if err != nil {
@ -660,9 +712,10 @@ func (s *SplitStore) compactFull() {
panic(err)
}
}
log.Infow("purging cold from hotstore done", "took", time.Since(purgeStart))
// remove the snoop tracking
purgeStart := time.Now()
// 2.4 remove the snoop tracking for cold objects
purgeStart = time.Now()
log.Info("purging cold objects from tracking store")
err = s.snoop.DeleteBatch(cold)
@ -671,12 +724,15 @@ func (s *SplitStore) compactFull() {
// TODO do something better here -- just continue?
panic(err)
}
log.Infow("purging cold from tracking store done", "took", time.Since(purgeStart))
log.Infow("purging done", "took", time.Since(purgeStart))
// 3. if we have dead objects, delete them from the hotstore and remove the tracking
if len(dead) > 0 {
log.Info("deleting dead objects")
purgeStart = time.Now()
log.Info("purging dead objects from the hotstore")
// TODO we really want batching for this!
for cid := range dead {
// delete the object from the hotstore
err = s.hot.DeleteBlock(cid)
@ -686,6 +742,7 @@ func (s *SplitStore) compactFull() {
panic(err)
}
}
log.Infow("purging dead from hotstore done", "took", time.Since(purgeStart))
// remove the snoop tracking
purgeStart := time.Now()
@ -698,11 +755,12 @@ func (s *SplitStore) compactFull() {
panic(err)
}
log.Infow("purging done", "took", time.Since(purgeStart))
log.Infow("purging dead from trackingstore done", "took", time.Since(purgeStart))
}
log.Infow("sweeping done", "took", time.Since(startSweep))
// we are done; do some housekeeping
err = s.snoop.Sync()
if err != nil {
// TODO do something better here