2022-02-04 12:29:50 +00:00
|
|
|
package splitstore
|
|
|
|
|
|
|
|
import (
|
2022-02-18 10:19:09 +00:00
|
|
|
"errors"
|
2022-02-04 12:29:50 +00:00
|
|
|
"runtime"
|
|
|
|
"sync/atomic"
|
|
|
|
|
2023-03-25 07:33:05 +00:00
|
|
|
blocks "github.com/ipfs/go-block-format"
|
2022-06-15 10:06:22 +00:00
|
|
|
"github.com/ipfs/go-cid"
|
2022-06-14 15:00:51 +00:00
|
|
|
"golang.org/x/xerrors"
|
2022-02-04 12:29:50 +00:00
|
|
|
)
|
|
|
|
|
2022-02-18 10:19:09 +00:00
|
|
|
var (
|
|
|
|
errReifyLimit = errors.New("reification limit reached")
|
|
|
|
ReifyLimit = 16384
|
|
|
|
)
|
2022-02-17 17:56:50 +00:00
|
|
|
|
2022-02-04 12:29:50 +00:00
|
|
|
func (s *SplitStore) reifyColdObject(c cid.Cid) {
|
2022-02-14 14:04:39 +00:00
|
|
|
if !s.isWarm() {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2022-02-04 12:29:50 +00:00
|
|
|
if isUnitaryObject(c) {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
s.reifyMx.Lock()
|
|
|
|
defer s.reifyMx.Unlock()
|
|
|
|
|
|
|
|
_, ok := s.reifyInProgress[c]
|
|
|
|
if ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
s.reifyPend[c] = struct{}{}
|
|
|
|
s.reifyCond.Broadcast()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *SplitStore) reifyOrchestrator() {
|
|
|
|
workers := runtime.NumCPU() / 4
|
|
|
|
if workers < 2 {
|
|
|
|
workers = 2
|
|
|
|
}
|
|
|
|
|
|
|
|
workch := make(chan cid.Cid, workers)
|
|
|
|
defer close(workch)
|
|
|
|
|
|
|
|
for i := 0; i < workers; i++ {
|
2022-02-14 14:10:54 +00:00
|
|
|
s.reifyWorkers.Add(1)
|
2022-02-04 12:29:50 +00:00
|
|
|
go s.reifyWorker(workch)
|
|
|
|
}
|
|
|
|
|
|
|
|
for {
|
|
|
|
s.reifyMx.Lock()
|
|
|
|
for len(s.reifyPend) == 0 && atomic.LoadInt32(&s.closing) == 0 {
|
|
|
|
s.reifyCond.Wait()
|
|
|
|
}
|
|
|
|
|
|
|
|
if atomic.LoadInt32(&s.closing) != 0 {
|
|
|
|
s.reifyMx.Unlock()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
reifyPend := s.reifyPend
|
|
|
|
s.reifyPend = make(map[cid.Cid]struct{})
|
|
|
|
s.reifyMx.Unlock()
|
|
|
|
|
|
|
|
for c := range reifyPend {
|
|
|
|
select {
|
|
|
|
case workch <- c:
|
|
|
|
case <-s.ctx.Done():
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *SplitStore) reifyWorker(workch chan cid.Cid) {
|
2022-02-14 14:10:54 +00:00
|
|
|
defer s.reifyWorkers.Done()
|
2022-02-04 12:29:50 +00:00
|
|
|
for c := range workch {
|
|
|
|
s.doReify(c)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *SplitStore) doReify(c cid.Cid) {
|
2022-08-05 20:34:16 +00:00
|
|
|
var toreify, toforget []cid.Cid
|
2022-02-04 12:29:50 +00:00
|
|
|
|
|
|
|
defer func() {
|
|
|
|
s.reifyMx.Lock()
|
|
|
|
defer s.reifyMx.Unlock()
|
|
|
|
|
|
|
|
for _, c := range toreify {
|
|
|
|
delete(s.reifyInProgress, c)
|
|
|
|
}
|
|
|
|
for _, c := range toforget {
|
|
|
|
delete(s.reifyInProgress, c)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
s.txnLk.RLock()
|
|
|
|
defer s.txnLk.RUnlock()
|
|
|
|
|
2022-02-18 10:19:09 +00:00
|
|
|
count := 0
|
2023-03-03 13:53:23 +00:00
|
|
|
_, err := s.walkObjectIncomplete(c, newTmpVisitor(),
|
2022-02-04 12:29:50 +00:00
|
|
|
func(c cid.Cid) error {
|
|
|
|
if isUnitaryObject(c) {
|
|
|
|
return errStopWalk
|
|
|
|
}
|
|
|
|
|
2022-02-18 10:19:09 +00:00
|
|
|
count++
|
|
|
|
if count > ReifyLimit {
|
|
|
|
return errReifyLimit
|
|
|
|
}
|
|
|
|
|
2022-02-04 12:29:50 +00:00
|
|
|
s.reifyMx.Lock()
|
|
|
|
_, inProgress := s.reifyInProgress[c]
|
|
|
|
if !inProgress {
|
|
|
|
s.reifyInProgress[c] = struct{}{}
|
|
|
|
}
|
|
|
|
s.reifyMx.Unlock()
|
|
|
|
|
|
|
|
if inProgress {
|
|
|
|
return errStopWalk
|
|
|
|
}
|
|
|
|
|
|
|
|
has, err := s.hot.Has(s.ctx, c)
|
|
|
|
if err != nil {
|
|
|
|
return xerrors.Errorf("error checking hotstore: %w", err)
|
|
|
|
}
|
|
|
|
|
2022-08-05 20:34:16 +00:00
|
|
|
// All reified blocks are tracked at reification start
|
2022-02-04 12:29:50 +00:00
|
|
|
if has {
|
2022-08-05 20:34:16 +00:00
|
|
|
toforget = append(toforget, c)
|
|
|
|
return errStopWalk
|
2022-02-04 12:29:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
toreify = append(toreify, c)
|
|
|
|
return nil
|
2022-02-17 10:52:52 +00:00
|
|
|
},
|
|
|
|
func(missing cid.Cid) error {
|
|
|
|
log.Warnf("missing reference while reifying %s: %s", c, missing)
|
|
|
|
return errStopWalk
|
2022-02-04 12:29:50 +00:00
|
|
|
})
|
|
|
|
|
|
|
|
if err != nil {
|
2022-08-05 20:34:16 +00:00
|
|
|
if errors.Is(err, errReifyLimit) {
|
2022-02-18 10:19:09 +00:00
|
|
|
log.Debug("reification aborted; reify limit reached")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2022-02-04 12:29:50 +00:00
|
|
|
log.Warnf("error walking cold object for reification (cid: %s): %s", c, err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Debugf("reifying %d objects rooted at %s", len(toreify), c)
|
|
|
|
|
2022-02-14 14:13:54 +00:00
|
|
|
// this should not get too big, maybe some 100s of objects.
|
2022-02-04 12:29:50 +00:00
|
|
|
batch := make([]blocks.Block, 0, len(toreify))
|
|
|
|
for _, c := range toreify {
|
|
|
|
blk, err := s.cold.Get(s.ctx, c)
|
|
|
|
if err != nil {
|
|
|
|
log.Warnf("error retrieving cold object for reification (cid: %s): %s", c, err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := s.checkClosing(); err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
batch = append(batch, blk)
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(batch) > 0 {
|
|
|
|
err = s.hot.PutMany(s.ctx, batch)
|
|
|
|
if err != nil {
|
|
|
|
log.Warnf("error reifying cold object (cid: %s): %s", c, err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|