diff --git a/blockstore/splitstore/splitstore_check.go b/blockstore/splitstore/splitstore_check.go index a36d0b78d..0b4cfe044 100644 --- a/blockstore/splitstore/splitstore_check.go +++ b/blockstore/splitstore/splitstore_check.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "sync/atomic" "time" @@ -67,7 +68,10 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error { } defer output.Close() //nolint:errcheck + var mx sync.Mutex write := func(format string, args ...interface{}) { + mx.Lock() + defer mx.Unlock() _, err := fmt.Fprintf(output, format+"\n", args...) if err != nil { log.Warnf("error writing check output: %s", err) @@ -82,7 +86,8 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error { write("compaction index: %d", s.compactionIndex) write("--") - var coldCnt, missingCnt int64 + coldCnt := new(int64) + missingCnt := new(int64) visitor, err := s.markSetEnv.Create("check", 0) if err != nil { @@ -111,10 +116,10 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error { } if has { - coldCnt++ + atomic.AddInt64(coldCnt, 1) write("cold object reference: %s", c) } else { - missingCnt++ + atomic.AddInt64(missingCnt, 1) write("missing object reference: %s", c) return errStopWalk } @@ -128,9 +133,9 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error { return err } - log.Infow("check done", "cold", coldCnt, "missing", missingCnt) + log.Infow("check done", "cold", *coldCnt, "missing", *missingCnt) write("--") - write("cold: %d missing: %d", coldCnt, missingCnt) + write("cold: %d missing: %d", *coldCnt, *missingCnt) write("DONE") return nil diff --git a/blockstore/splitstore/splitstore_warmup.go b/blockstore/splitstore/splitstore_warmup.go index 977c4d392..0670bd0f6 100644 --- a/blockstore/splitstore/splitstore_warmup.go +++ b/blockstore/splitstore/splitstore_warmup.go @@ -1,6 +1,7 @@ package splitstore import ( + "sync" "sync/atomic" "time" @@ -55,10 +56,11 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error { if WarmupBoundary < epoch { boundaryEpoch = epoch - WarmupBoundary } + var mx sync.Mutex batchHot := make([]blocks.Block, 0, batchSize) - count := int64(0) - xcount := int64(0) - missing := int64(0) + count := new(int64) + xcount := new(int64) + missing := new(int64) visitor, err := s.markSetEnv.Create("warmup", 0) if err != nil { @@ -73,7 +75,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error { return errStopWalk } - count++ + atomic.AddInt64(count, 1) has, err := s.hot.Has(s.ctx, c) if err != nil { @@ -87,22 +89,25 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error { blk, err := s.cold.Get(s.ctx, c) if err != nil { if err == bstore.ErrNotFound { - missing++ + atomic.AddInt64(missing, 1) return errStopWalk } return err } - xcount++ + atomic.AddInt64(xcount, 1) + mx.Lock() batchHot = append(batchHot, blk) if len(batchHot) == batchSize { err = s.hot.PutMany(s.ctx, batchHot) if err != nil { + mx.Unlock() return err } batchHot = batchHot[:0] } + mx.Unlock() return nil }) @@ -118,9 +123,9 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error { } } - log.Infow("warmup stats", "visited", count, "warm", xcount, "missing", missing) + log.Infow("warmup stats", "visited", *count, "warm", *xcount, "missing", *missing) - s.markSetSize = count + count>>2 // overestimate a bit + s.markSetSize = *count + *count>>2 // overestimate a bit err = s.ds.Put(s.ctx, markSetSizeKey, int64ToBytes(s.markSetSize)) if err != nil { log.Warnf("error saving mark set size: %s", err)