implement LMDB-backed tracking store
This commit is contained in:
parent
83f8a0ab12
commit
0d7476c5b2
@ -41,6 +41,8 @@ func (s *liveSet) Mark(cid cid.Cid) error {
|
||||
|
||||
func (s *liveSet) Has(cid cid.Cid) (has bool, err error) {
|
||||
err = s.env.View(func(txn *lmdb.Txn) error {
|
||||
txn.RawRead = true
|
||||
|
||||
_, err := txn.Get(s.db, cid.Hash())
|
||||
if err != nil {
|
||||
if lmdb.IsNotFound(err) {
|
||||
|
@ -1,15 +1,161 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/bmatsuo/lmdb-go/lmdb"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
)
|
||||
|
||||
var TrackingStoreMapSize int64 = 1 << 34 // 16G
|
||||
|
||||
type TrackingStore interface {
|
||||
Put(cid.Cid, abi.ChainEpoch) error
|
||||
PutBatch([]cid.Cid, abi.ChainEpoch) error
|
||||
Get(cid.Cid) (abi.ChainEpoch, error)
|
||||
Delete(cid.Cid) error
|
||||
Keys() (<-chan cid.Cid, error)
|
||||
Keys(context.Context) (<-chan cid.Cid, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
type trackingStore struct {
|
||||
env *lmdb.Env
|
||||
db lmdb.DBI
|
||||
}
|
||||
|
||||
func NewTrackingStore(path string) (TrackingStore, error) {
|
||||
env, err := lmdb.NewEnv()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize LMDB env: %w", err)
|
||||
}
|
||||
if err = env.SetMapSize(TrackingStoreMapSize); err != nil {
|
||||
return nil, fmt.Errorf("failed to set LMDB map size: %w", err)
|
||||
}
|
||||
if err = env.SetMaxDBs(1); err != nil {
|
||||
return nil, fmt.Errorf("failed to set LMDB max dbs: %w", err)
|
||||
}
|
||||
if err = env.SetMaxReaders(1); err != nil {
|
||||
return nil, fmt.Errorf("failed to set LMDB max readers: %w", err)
|
||||
}
|
||||
|
||||
if st, err := os.Stat(path); os.IsNotExist(err) {
|
||||
if err := os.MkdirAll(path, 0777); err != nil {
|
||||
return nil, fmt.Errorf("failed to create LMDB data directory at %s: %w", path, err)
|
||||
}
|
||||
} else if err != nil {
|
||||
return nil, fmt.Errorf("failed to stat LMDB data dir: %w", err)
|
||||
} else if !st.IsDir() {
|
||||
return nil, fmt.Errorf("LMDB path is not a directory %s", path)
|
||||
}
|
||||
|
||||
err = env.Open(path, lmdb.NoSync|lmdb.WriteMap|lmdb.MapAsync|lmdb.NoReadahead, 0777)
|
||||
if err != nil {
|
||||
env.Close() //nolint:errcheck
|
||||
return nil, fmt.Errorf("error opening LMDB database: %w", err)
|
||||
}
|
||||
|
||||
s := new(trackingStore)
|
||||
s.env = env
|
||||
err = env.Update(func(txn *lmdb.Txn) (err error) {
|
||||
s.db, err = txn.CreateDBI("snoop")
|
||||
return err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *trackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error {
|
||||
val := epochToBytes(epoch)
|
||||
return s.env.Update(func(txn *lmdb.Txn) error {
|
||||
return txn.Put(s.db, cid.Hash(), val, 0)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *trackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error {
|
||||
val := epochToBytes(epoch)
|
||||
return s.env.Update(func(txn *lmdb.Txn) error {
|
||||
for _, cid := range cids {
|
||||
err := txn.Put(s.db, cid.Hash(), val, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *trackingStore) Get(cid cid.Cid) (epoch abi.ChainEpoch, err error) {
|
||||
err = s.env.View(func(txn *lmdb.Txn) error {
|
||||
txn.RawRead = true
|
||||
|
||||
val, err := txn.Get(s.db, cid.Hash())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
epoch = bytesToEpoch(val)
|
||||
return nil
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *trackingStore) Delete(cid cid.Cid) error {
|
||||
return s.env.Update(func(txn *lmdb.Txn) error {
|
||||
return txn.Del(s.db, cid.Hash(), nil)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *trackingStore) Keys(ctx context.Context) (<-chan cid.Cid, error) {
|
||||
ch := make(chan cid.Cid)
|
||||
go func() {
|
||||
err := s.env.View(func(txn *lmdb.Txn) error {
|
||||
defer close(ch)
|
||||
|
||||
txn.RawRead = true
|
||||
cur, err := txn.OpenCursor(s.db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cur.Close()
|
||||
|
||||
for {
|
||||
k, _, err := cur.Get(nil, nil, lmdb.Next)
|
||||
if err != nil {
|
||||
if lmdb.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case ch <- cid.NewCidV1(cid.Raw, k):
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("error iterating over tracking store keys: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
func (s *trackingStore) Close() error {
|
||||
s.env.CloseDBI(s.db)
|
||||
return s.env.Close()
|
||||
}
|
||||
|
@ -183,11 +183,7 @@ func (s *SplitStore) Start(cs *store.ChainStore) error {
|
||||
bs, err := s.ds.Get(baseEpochKey)
|
||||
switch err {
|
||||
case nil:
|
||||
epoch, n := binary.Uvarint(bs)
|
||||
if n < 0 {
|
||||
panic("bogus base epoch")
|
||||
}
|
||||
s.baseEpoch = abi.ChainEpoch(epoch)
|
||||
s.baseEpoch = bytesToEpoch(bs)
|
||||
|
||||
case dstore.ErrNotFound:
|
||||
err = s.setBaseEpoch(s.curTs.Height())
|
||||
@ -307,7 +303,7 @@ func (s *SplitStore) compact() {
|
||||
// - If a cold object is reachable in the hot range, it stays in the hotstore.
|
||||
// - If a cold object is reachable in the cold range, it is moved to the coldstore.
|
||||
// - If a cold object is unreachable, it is deleted.
|
||||
ch, err := s.snoop.Keys()
|
||||
ch, err := s.snoop.Keys(context.Background())
|
||||
if err != nil {
|
||||
// TODO do something better here
|
||||
panic(err)
|
||||
@ -400,8 +396,19 @@ func (s *SplitStore) compact() {
|
||||
func (s *SplitStore) setBaseEpoch(epoch abi.ChainEpoch) error {
|
||||
s.baseEpoch = epoch
|
||||
// write to datastore
|
||||
bs := make([]byte, 16)
|
||||
n := binary.PutUvarint(bs, uint64(epoch))
|
||||
bs = bs[:n]
|
||||
return s.ds.Put(baseEpochKey, bs)
|
||||
return s.ds.Put(baseEpochKey, epochToBytes(epoch))
|
||||
}
|
||||
|
||||
func epochToBytes(epoch abi.ChainEpoch) []byte {
|
||||
buf := make([]byte, 16)
|
||||
n := binary.PutUvarint(buf, uint64(epoch))
|
||||
return buf[:n]
|
||||
}
|
||||
|
||||
func bytesToEpoch(buf []byte) abi.ChainEpoch {
|
||||
epoch, n := binary.Uvarint(buf)
|
||||
if n < 0 {
|
||||
panic("bogus base epoch bytes")
|
||||
}
|
||||
return abi.ChainEpoch(epoch)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user