lotus/blockstore/splitstore/splitstore_reify.go

182 lines
3.2 KiB
Go
Raw Normal View History

2022-02-04 12:29:50 +00:00
package splitstore
import (
2022-02-18 10:19:09 +00:00
"errors"
2022-02-04 12:29:50 +00:00
"runtime"
"sync/atomic"
"github.com/ipfs/go-cid"
blocks "github.com/ipfs/go-libipfs/blocks"
2022-06-14 15:00:51 +00:00
"golang.org/x/xerrors"
2022-02-04 12:29:50 +00:00
)
2022-02-18 10:19:09 +00:00
var (
errReifyLimit = errors.New("reification limit reached")
ReifyLimit = 16384
)
2022-02-04 12:29:50 +00:00
func (s *SplitStore) reifyColdObject(c cid.Cid) {
if !s.isWarm() {
return
}
2022-02-04 12:29:50 +00:00
if isUnitaryObject(c) {
return
}
s.reifyMx.Lock()
defer s.reifyMx.Unlock()
_, ok := s.reifyInProgress[c]
if ok {
return
}
s.reifyPend[c] = struct{}{}
s.reifyCond.Broadcast()
}
func (s *SplitStore) reifyOrchestrator() {
workers := runtime.NumCPU() / 4
if workers < 2 {
workers = 2
}
workch := make(chan cid.Cid, workers)
defer close(workch)
for i := 0; i < workers; i++ {
s.reifyWorkers.Add(1)
2022-02-04 12:29:50 +00:00
go s.reifyWorker(workch)
}
for {
s.reifyMx.Lock()
for len(s.reifyPend) == 0 && atomic.LoadInt32(&s.closing) == 0 {
s.reifyCond.Wait()
}
if atomic.LoadInt32(&s.closing) != 0 {
s.reifyMx.Unlock()
return
}
reifyPend := s.reifyPend
s.reifyPend = make(map[cid.Cid]struct{})
s.reifyMx.Unlock()
for c := range reifyPend {
select {
case workch <- c:
case <-s.ctx.Done():
return
}
}
}
}
func (s *SplitStore) reifyWorker(workch chan cid.Cid) {
defer s.reifyWorkers.Done()
2022-02-04 12:29:50 +00:00
for c := range workch {
s.doReify(c)
}
}
func (s *SplitStore) doReify(c cid.Cid) {
var toreify, toforget []cid.Cid
2022-02-04 12:29:50 +00:00
defer func() {
s.reifyMx.Lock()
defer s.reifyMx.Unlock()
for _, c := range toreify {
delete(s.reifyInProgress, c)
}
for _, c := range toforget {
delete(s.reifyInProgress, c)
}
}()
s.txnLk.RLock()
defer s.txnLk.RUnlock()
2022-02-18 10:19:09 +00:00
count := 0
err := s.walkObjectIncomplete(c, newTmpVisitor(),
2022-02-04 12:29:50 +00:00
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}
2022-02-18 10:19:09 +00:00
count++
if count > ReifyLimit {
return errReifyLimit
}
2022-02-04 12:29:50 +00:00
s.reifyMx.Lock()
_, inProgress := s.reifyInProgress[c]
if !inProgress {
s.reifyInProgress[c] = struct{}{}
}
s.reifyMx.Unlock()
if inProgress {
return errStopWalk
}
has, err := s.hot.Has(s.ctx, c)
if err != nil {
return xerrors.Errorf("error checking hotstore: %w", err)
}
// All reified blocks are tracked at reification start
2022-02-04 12:29:50 +00:00
if has {
toforget = append(toforget, c)
return errStopWalk
2022-02-04 12:29:50 +00:00
}
toreify = append(toreify, c)
return nil
},
func(missing cid.Cid) error {
log.Warnf("missing reference while reifying %s: %s", c, missing)
return errStopWalk
2022-02-04 12:29:50 +00:00
})
if err != nil {
if errors.Is(err, errReifyLimit) {
2022-02-18 10:19:09 +00:00
log.Debug("reification aborted; reify limit reached")
return
}
2022-02-04 12:29:50 +00:00
log.Warnf("error walking cold object for reification (cid: %s): %s", c, err)
return
}
log.Debugf("reifying %d objects rooted at %s", len(toreify), c)
// this should not get too big, maybe some 100s of objects.
2022-02-04 12:29:50 +00:00
batch := make([]blocks.Block, 0, len(toreify))
for _, c := range toreify {
blk, err := s.cold.Get(s.ctx, c)
if err != nil {
log.Warnf("error retrieving cold object for reification (cid: %s): %s", c, err)
continue
}
if err := s.checkClosing(); err != nil {
return
}
batch = append(batch, blk)
}
if len(batch) > 0 {
err = s.hot.PutMany(s.ctx, batch)
if err != nil {
log.Warnf("error reifying cold object (cid: %s): %s", c, err)
return
}
}
}