get rid of goroutine iteration in tracking store; long live ForEach

This commit is contained in:
vyzo 2021-02-26 15:59:36 +02:00
parent a586d42c3b
commit 842ec43c2f
2 changed files with 47 additions and 72 deletions

View File

@ -1,7 +1,6 @@
package splitstore package splitstore
import ( import (
"context"
"os" "os"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -20,7 +19,7 @@ type TrackingStore interface {
PutBatch([]cid.Cid, abi.ChainEpoch) error PutBatch([]cid.Cid, abi.ChainEpoch) error
Get(cid.Cid) (abi.ChainEpoch, error) Get(cid.Cid) (abi.ChainEpoch, error)
Delete(cid.Cid) error Delete(cid.Cid) error
Keys(context.Context) (<-chan cid.Cid, error) ForEach(func(cid.Cid, abi.ChainEpoch) error) error
Close() error Close() error
} }
@ -40,9 +39,6 @@ func NewTrackingStore(path string) (TrackingStore, error) {
if err = env.SetMaxDBs(1); err != nil { if err = env.SetMaxDBs(1); err != nil {
return nil, xerrors.Errorf("failed to set LMDB max dbs: %w", err) return nil, xerrors.Errorf("failed to set LMDB max dbs: %w", err)
} }
// if err = env.SetMaxReaders(2); err != nil {
// return nil, xerrors.Errorf("failed to set LMDB max readers: %w", err)
// }
if st, err := os.Stat(path); os.IsNotExist(err) { if st, err := os.Stat(path); os.IsNotExist(err) {
if err := os.MkdirAll(path, 0777); err != nil { if err := os.MkdirAll(path, 0777); err != nil {
@ -129,47 +125,37 @@ func (s *trackingStore) Delete(cid cid.Cid) error {
}) })
} }
func (s *trackingStore) Keys(ctx context.Context) (<-chan cid.Cid, error) { func (s *trackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error {
ch := make(chan cid.Cid) return withMaxReadersRetry(
go func() { func() error {
defer close(ch) return s.env.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
cur, err := txn.OpenCursor(s.db)
if err != nil {
return err
}
defer cur.Close()
err := withMaxReadersRetry( for {
func() error { k, v, err := cur.Get(nil, nil, lmdb.Next)
return s.env.View(func(txn *lmdb.Txn) error { if err != nil {
if lmdb.IsNotFound(err) {
return nil
}
txn.RawRead = true return err
cur, err := txn.OpenCursor(s.db) }
cid := cid.NewCidV1(cid.Raw, k)
epoch := bytesToEpoch(v)
err = f(cid, epoch)
if err != nil { if err != nil {
return err return err
} }
defer cur.Close() }
for {
k, _, err := cur.Get(nil, nil, lmdb.Next)
if err != nil {
if lmdb.IsNotFound(err) {
return nil
}
return err
}
select {
case ch <- cid.NewCidV1(cid.Raw, k):
case <-ctx.Done():
return nil
}
}
})
}) })
})
if err != nil {
log.Errorf("error iterating over tracking store keys: %s", err)
}
}()
return ch, nil
} }
func (s *trackingStore) Close() error { func (s *trackingStore) Close() error {

View File

@ -9,6 +9,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"golang.org/x/xerrors"
"github.com/ledgerwatch/lmdb-go/lmdb" "github.com/ledgerwatch/lmdb-go/lmdb"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
@ -369,51 +371,37 @@ func (s *SplitStore) compact() {
// Phase 2: sweep cold objects: // Phase 2: sweep cold objects:
// - If a cold object is reachable in the hot range, it stays in the hotstore. // - If a cold object is reachable in the hot range, it stays in the hotstore.
// - If a cold object is reachable in the cold range, it is moved to the coldstore. // - If a cold object is reachable in the cold range, it is moved to the coldstore.
// - If a cold object is unreachable, it is deleted. // - If a cold object is unreachable, it is deleted if GC is enabled, otherwise moved to the coldstore.
ch, err := s.snoop.Keys(context.Background())
if err != nil {
// TODO do something better here
panic(err)
}
startSweep := time.Now() startSweep := time.Now()
log.Info("sweeping cold objects") log.Info("sweeping cold objects")
// some stats for logging // some stats for logging
var stHot, stCold, stDead int var stHot, stCold, stDead int
for cid := range ch { err = s.snoop.ForEach(func(cid cid.Cid, wrEpoch abi.ChainEpoch) error {
wrEpoch, err := s.snoop.Get(cid)
if err != nil {
// TODO do something better here
panic(err)
}
// is the object stil hot? // is the object stil hot?
if wrEpoch > coldEpoch { if wrEpoch > coldEpoch {
// yes, stay in the hotstore // yes, stay in the hotstore
stHot++ stHot++
continue return nil
} }
// the object is cold -- check whether it is reachable in the hot range // the object is cold -- check whether it is reachable in the hot range
mark, err := hotSet.Has(cid) mark, err := hotSet.Has(cid)
if err != nil { if err != nil {
// TODO do something better here return xerrors.Errorf("error checking live mark for %s: %w", cid, err)
panic(err)
} }
if mark { if mark {
// the object is reachable in the hot range, stay in the hotstore // the object is reachable in the hot range, stay in the hotstore
stHot++ stHot++
continue return nil
} }
// check whether it is reachable in the cold range // check whether it is reachable in the cold range
mark, err = coldSet.Has(cid) mark, err = coldSet.Has(cid)
if err != nil { if err != nil {
// TODO do something better here return xerrors.Errorf("error checkiing cold set for %s: %w", cid, err)
panic(err)
} }
if s.enableGC { if s.enableGC {
@ -421,14 +409,12 @@ func (s *SplitStore) compact() {
// the object is reachable in the cold range, move it to the cold store // the object is reachable in the cold range, move it to the cold store
blk, err := s.hot.Get(cid) blk, err := s.hot.Get(cid)
if err != nil { if err != nil {
// TODO do something better here return xerrors.Errorf("error retrieving tracked block %s from hotstore: %w ", cid, err)
panic(err)
} }
err = s.cold.Put(blk) err = s.cold.Put(blk)
if err != nil { if err != nil {
// TODO do something better here return xerrors.Errorf("error puting block %s to coldstore: %w", cid, err)
panic(err)
} }
stCold++ stCold++
@ -440,14 +426,12 @@ func (s *SplitStore) compact() {
// if GC is disabled, we move both cold and dead objects to the coldstore // if GC is disabled, we move both cold and dead objects to the coldstore
blk, err := s.hot.Get(cid) blk, err := s.hot.Get(cid)
if err != nil { if err != nil {
// TODO do something better here return xerrors.Errorf("error retrieving tracked block %s from hotstore: %w ", cid, err)
panic(err)
} }
err = s.cold.Put(blk) err = s.cold.Put(blk)
if err != nil { if err != nil {
// TODO do something better here return xerrors.Errorf("error puting block %s to coldstore: %w", cid, err)
panic(err)
} }
if mark { if mark {
@ -460,16 +444,21 @@ func (s *SplitStore) compact() {
// delete the object from the hotstore // delete the object from the hotstore
err = s.hot.DeleteBlock(cid) err = s.hot.DeleteBlock(cid)
if err != nil { if err != nil {
// TODO do something better here return xerrors.Errorf("error deleting block %s from hotstore: %w", cid, err)
panic(err)
} }
// remove the snoop tracking // remove the snoop tracking
err = s.snoop.Delete(cid) err = s.snoop.Delete(cid)
if err != nil { if err != nil {
// TODO do something better here return xerrors.Errorf("error deleting cid %s from tracking store: %w", cid, err)
panic(err)
} }
return nil
})
if err != nil {
// TODO do something better here
panic(err)
} }
log.Infow("sweeping done", "took", time.Since(startSweep)) log.Infow("sweeping done", "took", time.Since(startSweep))