Merge pull request #8149 from filecoin-project/fix/reify-limit
fix: limit reification sizes
This commit is contained in:
commit
051ff5d205
@ -1,6 +1,7 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
|
||||
@ -10,13 +11,12 @@ import (
|
||||
cid "github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
var EnableReification = false
|
||||
var (
|
||||
errReifyLimit = errors.New("reification limit reached")
|
||||
ReifyLimit = 16384
|
||||
)
|
||||
|
||||
func (s *SplitStore) reifyColdObject(c cid.Cid) {
|
||||
if !EnableReification {
|
||||
return
|
||||
}
|
||||
|
||||
if !s.isWarm() {
|
||||
return
|
||||
}
|
||||
@ -104,12 +104,18 @@ func (s *SplitStore) doReify(c cid.Cid) {
|
||||
s.txnLk.RLock()
|
||||
defer s.txnLk.RUnlock()
|
||||
|
||||
count := 0
|
||||
err := s.walkObjectIncomplete(c, newTmpVisitor(),
|
||||
func(c cid.Cid) error {
|
||||
if isUnitaryObject(c) {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
count++
|
||||
if count > ReifyLimit {
|
||||
return errReifyLimit
|
||||
}
|
||||
|
||||
s.reifyMx.Lock()
|
||||
_, inProgress := s.reifyInProgress[c]
|
||||
if !inProgress {
|
||||
@ -150,6 +156,11 @@ func (s *SplitStore) doReify(c cid.Cid) {
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if xerrors.Is(err, errReifyLimit) {
|
||||
log.Debug("reification aborted; reify limit reached")
|
||||
return
|
||||
}
|
||||
|
||||
log.Warnf("error walking cold object for reification (cid: %s): %s", c, err)
|
||||
return
|
||||
}
|
||||
|
@ -495,8 +495,102 @@ func testSplitStoreReification(t *testing.T, f func(context.Context, blockstore.
|
||||
}
|
||||
}
|
||||
|
||||
func testSplitStoreReificationLimit(t *testing.T, f func(context.Context, blockstore.Blockstore, cid.Cid) error) {
|
||||
ds := dssync.MutexWrap(datastore.NewMapDatastore())
|
||||
hot := newMockStore()
|
||||
cold := newMockStore()
|
||||
|
||||
mkRandomBlock := func() blocks.Block {
|
||||
data := make([]byte, 128)
|
||||
_, err := rand.Read(data)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return blocks.NewBlock(data)
|
||||
}
|
||||
|
||||
block1 := mkRandomBlock()
|
||||
block2 := mkRandomBlock()
|
||||
block3 := mkRandomBlock()
|
||||
|
||||
hdr := mock.MkBlock(nil, 0, 0)
|
||||
hdr.Messages = block1.Cid()
|
||||
hdr.ParentMessageReceipts = block2.Cid()
|
||||
hdr.ParentStateRoot = block3.Cid()
|
||||
block4, err := hdr.ToStorageBlock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
allBlocks := []blocks.Block{block1, block2, block3, block4}
|
||||
for _, blk := range allBlocks {
|
||||
err := cold.Put(context.Background(), blk)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
path, err := ioutil.TempDir("", "splitstore.*")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Cleanup(func() {
|
||||
_ = os.RemoveAll(path)
|
||||
})
|
||||
|
||||
ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map"})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer ss.Close() //nolint
|
||||
|
||||
ss.warmupEpoch = 1
|
||||
go ss.reifyOrchestrator()
|
||||
|
||||
waitForReification := func() {
|
||||
for {
|
||||
ss.reifyMx.Lock()
|
||||
ready := len(ss.reifyPend) == 0 && len(ss.reifyInProgress) == 0
|
||||
ss.reifyMx.Unlock()
|
||||
|
||||
if ready {
|
||||
return
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
// do a hot access -- nothing should be reified as the limit should be exceeded
|
||||
oldReifyLimit := ReifyLimit
|
||||
ReifyLimit = 2
|
||||
t.Cleanup(func() {
|
||||
ReifyLimit = oldReifyLimit
|
||||
})
|
||||
|
||||
err = f(blockstore.WithHotView(context.Background()), ss, block4.Cid())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
waitForReification()
|
||||
|
||||
for _, blk := range allBlocks {
|
||||
has, err := hot.Has(context.Background(), blk.Cid())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if has {
|
||||
t.Fatal("block unexpectedly reified")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestSplitStoreReification(t *testing.T) {
|
||||
EnableReification = true
|
||||
t.Log("test reification with Has")
|
||||
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
|
||||
_, err := s.Has(ctx, c)
|
||||
@ -516,6 +610,11 @@ func TestSplitStoreReification(t *testing.T) {
|
||||
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
|
||||
return s.View(ctx, c, func(_ []byte) error { return nil })
|
||||
})
|
||||
t.Log("test reification limit")
|
||||
testSplitStoreReificationLimit(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
|
||||
_, err := s.Has(ctx, c)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
type mockChain struct {
|
||||
|
@ -26,6 +26,10 @@ type tmpVisitor struct {
|
||||
var _ ObjectVisitor = (*tmpVisitor)(nil)
|
||||
|
||||
func (v *tmpVisitor) Visit(c cid.Cid) (bool, error) {
|
||||
if isUnitaryObject(c) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return v.set.Visit(c), nil
|
||||
}
|
||||
|
||||
@ -45,6 +49,10 @@ func newConcurrentVisitor() *concurrentVisitor {
|
||||
}
|
||||
|
||||
func (v *concurrentVisitor) Visit(c cid.Cid) (bool, error) {
|
||||
if isUnitaryObject(c) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
v.mx.Lock()
|
||||
defer v.mx.Unlock()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user