refactor txn reference tracking, do deep marking of DAGs
This commit is contained in:
parent
a98a062347
commit
bd92c230da
@ -232,21 +232,11 @@ func (s *SplitStore) Has(c cid.Cid) (bool, error) {
|
|||||||
// treat it as an implicit Write, absence options -- the vm uses this check to avoid duplicate
|
// treat it as an implicit Write, absence options -- the vm uses this check to avoid duplicate
|
||||||
// writes on Flush. When we have options in the API, the vm can explicitly signal that this is
|
// writes on Flush. When we have options in the API, the vm can explicitly signal that this is
|
||||||
// an implicit Write.
|
// an implicit Write.
|
||||||
if s.isPendingWrite(c) {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
s.trackWrite(c)
|
s.trackWrite(c)
|
||||||
|
|
||||||
// also make sure the object is considered live during compaction in case we have already
|
// also make sure the object is considered live during compaction in case we have already
|
||||||
// flushed pending writes and started compaction
|
// flushed pending writes and started compaction
|
||||||
if s.txnProtect != nil {
|
s.trackTxnRef(c)
|
||||||
err = s.txnProtect.Mark(c)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error protecting object (cid: %s) in compaction transaction: %s", c, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
@ -262,13 +252,7 @@ func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) {
|
|||||||
|
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
if s.txnProtect != nil {
|
s.trackTxnRef(cid)
|
||||||
err = s.txnProtect.Mark(cid)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error protecting object in compaction transaction: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return blk, err
|
return blk, err
|
||||||
|
|
||||||
case bstore.ErrNotFound:
|
case bstore.ErrNotFound:
|
||||||
@ -300,13 +284,7 @@ func (s *SplitStore) GetSize(cid cid.Cid) (int, error) {
|
|||||||
|
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
if s.txnProtect != nil {
|
s.trackTxnRef(cid)
|
||||||
err = s.txnProtect.Mark(cid)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error protecting object in compaction transaction: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return size, err
|
return size, err
|
||||||
|
|
||||||
case bstore.ErrNotFound:
|
case bstore.ErrNotFound:
|
||||||
@ -336,15 +314,8 @@ func (s *SplitStore) Put(blk blocks.Block) error {
|
|||||||
s.trackWrite(blk.Cid())
|
s.trackWrite(blk.Cid())
|
||||||
|
|
||||||
err := s.hot.Put(blk)
|
err := s.hot.Put(blk)
|
||||||
if err == nil && s.txnProtect != nil {
|
if err == nil {
|
||||||
err = s.txnProtect.Mark(blk.Cid())
|
s.trackTxnRef(blk.Cid())
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error protecting object in compaction transaction: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error putting block %s in hotstore: %s", blk.Cid(), err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
@ -362,18 +333,8 @@ func (s *SplitStore) PutMany(blks []blocks.Block) error {
|
|||||||
s.trackWriteMany(batch)
|
s.trackWriteMany(batch)
|
||||||
|
|
||||||
err := s.hot.PutMany(blks)
|
err := s.hot.PutMany(blks)
|
||||||
if err == nil && s.txnProtect != nil {
|
if err == nil {
|
||||||
for _, cid := range batch {
|
s.trackTxnRefMany(batch)
|
||||||
err2 := s.txnProtect.Mark(cid)
|
|
||||||
if err2 != nil {
|
|
||||||
log.Errorf("error protecting object in compaction transaction: %s", err)
|
|
||||||
err = multierr.Combine(err, err2)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error putting batch in hotstore: %s", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
@ -425,14 +386,8 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error {
|
|||||||
err := s.hot.View(cid, cb)
|
err := s.hot.View(cid, cb)
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
if s.txnProtect != nil {
|
s.trackTxnRef(cid)
|
||||||
err = s.txnProtect.Mark(cid)
|
return nil
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error protecting object in compaction transaction: %s", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
|
|
||||||
case bstore.ErrNotFound:
|
case bstore.ErrNotFound:
|
||||||
s.mx.Lock()
|
s.mx.Lock()
|
||||||
@ -628,14 +583,6 @@ func (s *SplitStore) trackWriteMany(cids []cid.Cid) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) isPendingWrite(c cid.Cid) bool {
|
|
||||||
s.mx.Lock()
|
|
||||||
defer s.mx.Unlock()
|
|
||||||
|
|
||||||
_, ok := s.pendingWrites[c]
|
|
||||||
return ok
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SplitStore) flushPendingWrites(locked bool) {
|
func (s *SplitStore) flushPendingWrites(locked bool) {
|
||||||
if !locked {
|
if !locked {
|
||||||
s.mx.Lock()
|
s.mx.Lock()
|
||||||
@ -679,6 +626,49 @@ func (s *SplitStore) flushPendingWrites(locked bool) {
|
|||||||
s.debug.LogWriteMany(s.curTs, cids, epoch)
|
s.debug.LogWriteMany(s.curTs, cids, epoch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) trackTxnRef(c cid.Cid) {
|
||||||
|
if s.txnProtect == nil {
|
||||||
|
// not compacting
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: this occurs check assumes a markset without false positives, which is currently the case
|
||||||
|
// with the map
|
||||||
|
has, err := s.txnProtect.Has(c)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error occur checking object (cid: %s) for compaction transaction: %s", c, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if has {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.Prefix().Codec != cid.DagCBOR {
|
||||||
|
err = s.txnProtect.Mark(c)
|
||||||
|
} else {
|
||||||
|
err = s.walkLinks(c, cid.NewSet(), func(c cid.Cid) error {
|
||||||
|
return s.txnProtect.Mark(c)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error protecting object (cid: %s) from compaction transaction: %s", c, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) {
|
||||||
|
if s.txnProtect == nil {
|
||||||
|
// not compacting
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range cids {
|
||||||
|
s.trackTxnRef(c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *SplitStore) background() {
|
func (s *SplitStore) background() {
|
||||||
ticker := time.NewTicker(time.Second)
|
ticker := time.NewTicker(time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
Loading…
Reference in New Issue
Block a user