implement lmdb-backed LiveSet
This commit is contained in:
parent
df856b7315
commit
3f92a000c7
@ -1,6 +1,8 @@
|
|||||||
package splitstore
|
package splitstore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/bmatsuo/lmdb-go/lmdb"
|
||||||
|
|
||||||
cid "github.com/ipfs/go-cid"
|
cid "github.com/ipfs/go-cid"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -9,3 +11,55 @@ type LiveSet interface {
|
|||||||
Has(cid.Cid) (bool, error)
|
Has(cid.Cid) (bool, error)
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type liveSet struct {
|
||||||
|
env *lmdb.Env
|
||||||
|
db lmdb.DBI
|
||||||
|
}
|
||||||
|
|
||||||
|
var markBytes = []byte{1}
|
||||||
|
|
||||||
|
func NewLiveSet(env *lmdb.Env, name string) (LiveSet, error) {
|
||||||
|
var db lmdb.DBI
|
||||||
|
err := env.Update(func(txn *lmdb.Txn) (err error) {
|
||||||
|
db, err = txn.CreateDBI(name)
|
||||||
|
return
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &liveSet{env: env, db: db}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *liveSet) Mark(cid cid.Cid) error {
|
||||||
|
return s.env.Update(func(txn *lmdb.Txn) error {
|
||||||
|
return txn.Put(s.db, cid.Hash(), markBytes, 0)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *liveSet) Has(cid cid.Cid) (has bool, err error) {
|
||||||
|
err = s.env.View(func(txn *lmdb.Txn) error {
|
||||||
|
_, err := txn.Get(s.db, cid.Hash())
|
||||||
|
if err != nil {
|
||||||
|
if lmdb.IsNotFound(err) {
|
||||||
|
has = false
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
has = true
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *liveSet) Close() error {
|
||||||
|
return s.env.Update(func(txn *lmdb.Txn) error {
|
||||||
|
return txn.Drop(s.db, true)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -7,6 +7,8 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/bmatsuo/lmdb-go/lmdb"
|
||||||
|
|
||||||
blocks "github.com/ipfs/go-block-format"
|
blocks "github.com/ipfs/go-block-format"
|
||||||
cid "github.com/ipfs/go-cid"
|
cid "github.com/ipfs/go-cid"
|
||||||
dstore "github.com/ipfs/go-datastore"
|
dstore "github.com/ipfs/go-datastore"
|
||||||
@ -39,6 +41,8 @@ type SplitStore struct {
|
|||||||
|
|
||||||
stateMx sync.Mutex
|
stateMx sync.Mutex
|
||||||
compacting bool
|
compacting bool
|
||||||
|
|
||||||
|
env *lmdb.Env
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ bstore.Blockstore = (*SplitStore)(nil)
|
var _ bstore.Blockstore = (*SplitStore)(nil)
|
||||||
@ -201,6 +205,17 @@ func (s *SplitStore) Start(cs *store.ChainStore) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) Close() error {
|
||||||
|
if s.isCompacting() {
|
||||||
|
log.Warn("ongoing compaction; waiting for it to finish...")
|
||||||
|
for s.isCompacting() {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.env.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error {
|
func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error {
|
||||||
s.curTs = apply[len(apply)-1]
|
s.curTs = apply[len(apply)-1]
|
||||||
epoch := s.curTs.Height()
|
epoch := s.curTs.Height()
|
||||||
@ -237,14 +252,14 @@ func (s *SplitStore) setCompacting(state bool) {
|
|||||||
func (s *SplitStore) compact() {
|
func (s *SplitStore) compact() {
|
||||||
// create two on disk live sets, one for marking the cold finality region
|
// create two on disk live sets, one for marking the cold finality region
|
||||||
// and one for marking the hot region
|
// and one for marking the hot region
|
||||||
hotSet, err := s.newLiveSet()
|
hotSet, err := NewLiveSet(s.env, "hot")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO do something better here
|
// TODO do something better here
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
defer hotSet.Close() //nolint:errcheck
|
defer hotSet.Close() //nolint:errcheck
|
||||||
|
|
||||||
coldSet, err := s.newLiveSet()
|
coldSet, err := NewLiveSet(s.env, "cold")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO do something better here
|
// TODO do something better here
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -390,8 +405,3 @@ func (s *SplitStore) setBaseEpoch(epoch abi.ChainEpoch) error {
|
|||||||
bs = bs[:n]
|
bs = bs[:n]
|
||||||
return s.ds.Put(baseEpochKey, bs)
|
return s.ds.Put(baseEpochKey, bs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) newLiveSet() (LiveSet, error) {
|
|
||||||
// TODO implementation
|
|
||||||
return nil, errors.New("newLiveSet: IMPLEMENT ME!!!") //nolint
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user