fix check and warmup for parallel walk

This commit is contained in:
vyzo 2022-01-26 09:01:51 +02:00
parent 10f2445a99
commit fe47d6a1a4
2 changed files with 23 additions and 13 deletions

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -67,7 +68,10 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
} }
defer output.Close() //nolint:errcheck defer output.Close() //nolint:errcheck
var mx sync.Mutex
write := func(format string, args ...interface{}) { write := func(format string, args ...interface{}) {
mx.Lock()
defer mx.Unlock()
_, err := fmt.Fprintf(output, format+"\n", args...) _, err := fmt.Fprintf(output, format+"\n", args...)
if err != nil { if err != nil {
log.Warnf("error writing check output: %s", err) log.Warnf("error writing check output: %s", err)
@ -82,7 +86,8 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
write("compaction index: %d", s.compactionIndex) write("compaction index: %d", s.compactionIndex)
write("--") write("--")
var coldCnt, missingCnt int64 coldCnt := new(int64)
missingCnt := new(int64)
visitor, err := s.markSetEnv.Create("check", 0) visitor, err := s.markSetEnv.Create("check", 0)
if err != nil { if err != nil {
@ -111,10 +116,10 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
} }
if has { if has {
coldCnt++ atomic.AddInt64(coldCnt, 1)
write("cold object reference: %s", c) write("cold object reference: %s", c)
} else { } else {
missingCnt++ atomic.AddInt64(missingCnt, 1)
write("missing object reference: %s", c) write("missing object reference: %s", c)
return errStopWalk return errStopWalk
} }
@ -128,9 +133,9 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
return err return err
} }
log.Infow("check done", "cold", coldCnt, "missing", missingCnt) log.Infow("check done", "cold", *coldCnt, "missing", *missingCnt)
write("--") write("--")
write("cold: %d missing: %d", coldCnt, missingCnt) write("cold: %d missing: %d", *coldCnt, *missingCnt)
write("DONE") write("DONE")
return nil return nil

View File

@ -1,6 +1,7 @@
package splitstore package splitstore
import ( import (
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -55,10 +56,11 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
if WarmupBoundary < epoch { if WarmupBoundary < epoch {
boundaryEpoch = epoch - WarmupBoundary boundaryEpoch = epoch - WarmupBoundary
} }
var mx sync.Mutex
batchHot := make([]blocks.Block, 0, batchSize) batchHot := make([]blocks.Block, 0, batchSize)
count := int64(0) count := new(int64)
xcount := int64(0) xcount := new(int64)
missing := int64(0) missing := new(int64)
visitor, err := s.markSetEnv.Create("warmup", 0) visitor, err := s.markSetEnv.Create("warmup", 0)
if err != nil { if err != nil {
@ -73,7 +75,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
return errStopWalk return errStopWalk
} }
count++ atomic.AddInt64(count, 1)
has, err := s.hot.Has(s.ctx, c) has, err := s.hot.Has(s.ctx, c)
if err != nil { if err != nil {
@ -87,22 +89,25 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
blk, err := s.cold.Get(s.ctx, c) blk, err := s.cold.Get(s.ctx, c)
if err != nil { if err != nil {
if err == bstore.ErrNotFound { if err == bstore.ErrNotFound {
missing++ atomic.AddInt64(missing, 1)
return errStopWalk return errStopWalk
} }
return err return err
} }
xcount++ atomic.AddInt64(xcount, 1)
mx.Lock()
batchHot = append(batchHot, blk) batchHot = append(batchHot, blk)
if len(batchHot) == batchSize { if len(batchHot) == batchSize {
err = s.hot.PutMany(s.ctx, batchHot) err = s.hot.PutMany(s.ctx, batchHot)
if err != nil { if err != nil {
mx.Unlock()
return err return err
} }
batchHot = batchHot[:0] batchHot = batchHot[:0]
} }
mx.Unlock()
return nil return nil
}) })
@ -118,9 +123,9 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
} }
} }
log.Infow("warmup stats", "visited", count, "warm", xcount, "missing", missing) log.Infow("warmup stats", "visited", *count, "warm", *xcount, "missing", *missing)
s.markSetSize = count + count>>2 // overestimate a bit s.markSetSize = *count + *count>>2 // overestimate a bit
err = s.ds.Put(s.ctx, markSetSizeKey, int64ToBytes(s.markSetSize)) err = s.ds.Put(s.ctx, markSetSizeKey, int64ToBytes(s.markSetSize))
if err != nil { if err != nil {
log.Warnf("error saving mark set size: %s", err) log.Warnf("error saving mark set size: %s", err)