address review comments

This commit is contained in:
vyzo 2021-07-10 16:30:27 +03:00
parent 870a47f55d
commit 0c5e336ff1

View File

@ -13,6 +13,7 @@ import (
"time"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
blocks "github.com/ipfs/go-block-format"
@ -110,10 +111,16 @@ type ChainAccessor interface {
SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error)
}
// hotstore is the interface that must be satisfied by the hot blockstore; it is an extension
// of the Blockstore interface with the traits we need for compaction.
type hotstore interface {
bstore.Blockstore
bstore.BlockstoreIterator
}
type SplitStore struct {
compacting int32 // compaction (or warmp up) in progress
critsection int32 // compaction critical section
closing int32 // the splitstore is closing
compacting int32 // compaction (or warmp up) in progress
closing int32 // the splitstore is closing
cfg *Config
@ -125,8 +132,8 @@ type SplitStore struct {
chain ChainAccessor
ds dstore.Datastore
hot bstore.Blockstore
cold bstore.Blockstore
hot hotstore
markSetEnv MarkSetEnv
markSetSize int64
@ -139,7 +146,7 @@ type SplitStore struct {
// transactional protection for concurrent read/writes during compaction
txnLk sync.RWMutex
txnActive bool
txnViews *sync.WaitGroup
txnViews sync.WaitGroup
txnProtect MarkSet
txnRefsMx sync.Mutex
txnRefs map[cid.Cid]struct{}
@ -162,9 +169,15 @@ func init() {
// is backed by the provided hot and cold stores. The returned SplitStore MUST be
// attached to the ChainStore with Start in order to trigger compaction.
func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Config) (*SplitStore, error) {
// hot blockstore must support BlockstoreIterator
if _, ok := hot.(bstore.BlockstoreIterator); !ok {
return nil, xerrors.Errorf("hot blockstore does not support efficient iteration: %T", hot)
// hot blockstore must support the hotstore interface
hots, ok := hot.(hotstore)
if !ok {
// be specific about what is missing
if _, ok := hot.(bstore.BlockstoreIterator); !ok {
return nil, xerrors.Errorf("hot blockstore does not support efficient iteration: %T", hot)
}
return nil, xerrors.Errorf("hot blockstore does not support the necessary traits: %T", hot)
}
// the markset env
@ -177,12 +190,10 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
ss := &SplitStore{
cfg: cfg,
ds: ds,
hot: hot,
cold: cold,
hot: hots,
markSetEnv: markSetEnv,
txnViews: new(sync.WaitGroup),
coldPurgeSize: defaultColdPurgeSize,
}
@ -252,18 +263,13 @@ func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) {
return blk, nil
case bstore.ErrNotFound:
if s.debug != nil {
s.mx.Lock()
warm := s.warmupEpoch > 0
s.mx.Unlock()
if warm {
s.debug.LogReadMiss(cid)
}
if s.isWarm() {
s.debug.LogReadMiss(cid)
}
blk, err = s.cold.Get(cid)
if err == nil {
stats.Record(context.Background(), metrics.SplitstoreMiss.M(1))
stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
}
return blk, err
@ -294,18 +300,13 @@ func (s *SplitStore) GetSize(cid cid.Cid) (int, error) {
return size, nil
case bstore.ErrNotFound:
if s.debug != nil {
s.mx.Lock()
warm := s.warmupEpoch > 0
s.mx.Unlock()
if warm {
s.debug.LogReadMiss(cid)
}
if s.isWarm() {
s.debug.LogReadMiss(cid)
}
size, err = s.cold.GetSize(cid)
if err == nil {
stats.Record(context.Background(), metrics.SplitstoreMiss.M(1))
stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
}
return size, err
@ -393,15 +394,21 @@ func (s *SplitStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, err
}
ch := make(chan cid.Cid)
seen := cid.NewSet()
ch := make(chan cid.Cid, 8) // buffer is arbitrary, just enough to avoid context switches
go func() {
defer cancel()
defer close(ch)
for _, in := range []<-chan cid.Cid{chHot, chCold} {
for cid := range in {
for c := range in {
// ensure we only emit each key once
if !seen.Visit(c) {
continue
}
select {
case ch <- cid:
case ch <- c:
case <-ctx.Done():
return
}
@ -443,18 +450,13 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error {
err := s.hot.View(cid, cb)
switch err {
case bstore.ErrNotFound:
if s.debug != nil {
s.mx.Lock()
warm := s.warmupEpoch > 0
s.mx.Unlock()
if warm {
s.debug.LogReadMiss(cid)
}
if s.isWarm() {
s.debug.LogReadMiss(cid)
}
err = s.cold.View(cid, cb)
if err == nil {
stats.Record(context.Background(), metrics.SplitstoreMiss.M(1))
stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
}
return err
@ -463,6 +465,12 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error {
}
}
func (s *SplitStore) isWarm() bool {
s.mx.Lock()
defer s.mx.Unlock()
return s.warmupEpoch > 0
}
// State tracking
func (s *SplitStore) Start(chain ChainAccessor) error {
s.chain = chain
@ -527,11 +535,14 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
}
func (s *SplitStore) Close() error {
atomic.StoreInt32(&s.closing, 1)
if !atomic.CompareAndSwapInt32(&s.closing, 0, 1) {
// already closing
return nil
}
if atomic.LoadInt32(&s.critsection) == 1 {
log.Warn("ongoing compaction in critical section; waiting for it to finish...")
for atomic.LoadInt32(&s.critsection) == 1 {
if atomic.LoadInt32(&s.compacting) == 1 {
log.Warn("close with ongoing compaction in progress; waiting for it to finish...")
for atomic.LoadInt32(&s.compacting) == 1 {
time.Sleep(time.Second)
}
}
@ -549,12 +560,24 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
curTs := apply[len(apply)-1]
epoch := curTs.Height()
// NOTE: there is an implicit invariant assumption that HeadChange is invoked
// synchronously and no other HeadChange can be invoked while one is in
// progress.
// this is guaranteed by the chainstore, and it is pervasive in all lotus
// -- if that ever changes then all hell will break loose in general and
// we will have a rance to protectTipSets here.
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
// we are currently compacting -- protect the new tipset(s)
s.protectTipSets(apply)
return nil
}
// check if we are actually closing first
if atomic.LoadInt32(&s.closing) == 1 {
atomic.StoreInt32(&s.compacting, 0)
return nil
}
timestamp := time.Unix(int64(curTs.MinTimestamp()), 0)
if time.Since(timestamp) > SyncGapTime {
// don't attempt compaction before we have caught up syncing
@ -608,7 +631,7 @@ func (s *SplitStore) protectView(c cid.Cid) *sync.WaitGroup {
if !s.txnActive {
s.txnViews.Add(1)
return s.txnViews
return &s.txnViews
}
s.trackTxnRef(c)
@ -653,6 +676,8 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) {
}
s.txnRefsMx.Lock()
defer s.txnRefsMx.Unlock()
quiet := false
for _, c := range cids {
if isUnitaryObject(c) {
@ -676,7 +701,7 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) {
s.txnRefs[c] = struct{}{}
}
s.txnRefsMx.Unlock()
return
}
@ -714,6 +739,7 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
workch <- c
count++
}
close(workch)
if count == 0 {
return nil
@ -727,31 +753,23 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
workers = count
}
close(workch)
worker := func(wg *sync.WaitGroup) {
if wg != nil {
defer wg.Done()
}
worker := func() error {
for c := range workch {
err := s.doTxnProtect(c, markSet)
if err != nil {
log.Warnf("error protecting transactional references: %s", err)
return
return xerrors.Errorf("error protecting transactional references to %s: %w", c, err)
}
}
return nil
}
if workers > 1 {
wg := new(sync.WaitGroup)
for i := 0; i < workers; i++ {
wg.Add(1)
go worker(wg)
}
wg.Wait()
} else {
worker(nil)
g := new(errgroup.Group)
for i := 0; i < workers; i++ {
g.Go(worker)
}
if err := g.Wait(); err != nil {
return err
}
log.Infow("protecting transactional refs done", "took", time.Since(startProtect), "protected", count)
@ -761,6 +779,10 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
// transactionally protect a reference by walking the object and marking.
// concurrent markings are short circuited by checking the markset.
func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error {
if err := s.checkClosing(); err != nil {
return err
}
// Note: cold objects are deleted heaviest first, so the consituents of an object
// cannot be deleted before the object itself.
return s.walkObjectIncomplete(root, cid.NewSet(),
@ -918,7 +940,7 @@ func (s *SplitStore) compact(curTs *types.TipSet, wg *sync.WaitGroup) {
start = time.Now()
err := s.doCompact(curTs)
took := time.Since(start).Milliseconds()
stats.Record(context.Background(), metrics.SplitstoreCompactionTimeSeconds.M(float64(took)/1e3))
stats.Record(s.ctx, metrics.SplitstoreCompactionTimeSeconds.M(float64(took)/1e3))
if err != nil {
log.Errorf("COMPACTION ERROR: %s", err)
@ -991,7 +1013,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
var hotCnt, coldCnt int
cold := make([]cid.Cid, 0, s.coldPurgeSize)
err = s.hot.(bstore.BlockstoreIterator).ForEachKey(func(c cid.Cid) error {
err = s.hot.ForEachKey(func(c cid.Cid) error {
// was it marked?
mark, err := markSet.Has(c)
if err != nil {
@ -1021,8 +1043,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
}
log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt)
stats.Record(context.Background(), metrics.SplitstoreCompactionHot.M(int64(hotCnt)))
stats.Record(context.Background(), metrics.SplitstoreCompactionCold.M(int64(coldCnt)))
stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(int64(hotCnt)))
stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(int64(coldCnt)))
if err := s.checkClosing(); err != nil {
return err
@ -1064,6 +1086,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Infow("sorting done", "took", time.Since(startSort))
// 4.1 protect transactional refs once more
// strictly speaking, this is not necessary as purge will do it before deleting each
// batch. however, there is likely a largish number of references accumulated during
// ths sort and this protects before entering pruge context.
err = s.protectTxnRefs(markSet)
if err != nil {
return xerrors.Errorf("error protecting transactional refs: %w", err)
@ -1073,16 +1098,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return err
}
// Enter critical section
log.Info("entering critical section")
atomic.StoreInt32(&s.critsection, 1)
defer atomic.StoreInt32(&s.critsection, 0)
// check to see if we are closing first; if that's the case just return
if err := s.checkClosing(); err != nil {
return err
}
// 5. purge cold objects from the hotstore, taking protected references into account
log.Info("purging cold objects from the hotstore")
startPurge := time.Now()
@ -1119,10 +1134,7 @@ func (s *SplitStore) beginTxnProtect() *sync.WaitGroup {
s.txnRefs = make(map[cid.Cid]struct{})
s.txnMissing = make(map[cid.Cid]struct{})
wg := s.txnViews
s.txnViews = nil
return wg
return &s.txnViews
}
func (s *SplitStore) beginTxnMarking(markSet MarkSet) {
@ -1141,11 +1153,13 @@ func (s *SplitStore) endTxnProtect() {
return
}
// release markset memory
s.txnProtect.Close()
s.txnActive = false
s.txnProtect = nil
s.txnRefs = nil
s.txnMissing = nil
s.txnViews = new(sync.WaitGroup)
}
func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs bool,
@ -1238,6 +1252,11 @@ func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) erro
return nil
}
// check this before recursing
if err := s.checkClosing(); err != nil {
return err
}
var links []cid.Cid
err := s.view(c, func(data []byte) error {
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
@ -1294,6 +1313,11 @@ func (s *SplitStore) walkObjectIncomplete(c cid.Cid, walked *cid.Set, f, missing
return nil
}
// check this before recursing
if err := s.checkClosing(); err != nil {
return err
}
var links []cid.Cid
err := s.view(c, func(data []byte) error {
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
@ -1522,24 +1546,28 @@ func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error {
func(cids []cid.Cid) error {
deadCids := deadCids[:0]
again:
if err := s.checkClosing(); err != nil {
return err
}
for {
if err := s.checkClosing(); err != nil {
return err
}
s.txnLk.Lock()
if len(s.txnRefs) > 0 {
s.txnLk.Lock()
if len(s.txnRefs) == 0 {
// keep the lock!
break
}
// unlock and protect
s.txnLk.Unlock()
err := s.protectTxnRefs(markSet)
if err != nil {
return xerrors.Errorf("error protecting transactional refs: %w", err)
}
goto again
}
defer s.txnLk.Unlock()
for _, c := range cids {
live, err := markSet.Has(c)
if err != nil {