housekeeping
- remove defunct tracking store implementations - update splitstore node config - use mark set type config option (defaulting to mapts); a memory constrained node may want to use an on-disk one
This commit is contained in:
parent
19d1b1f532
commit
190cb18ab0
@ -90,17 +90,12 @@ const (
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
// TrackingStore is the type of tracking store to use.
|
||||
//
|
||||
// Supported values are: "bolt" (default if omitted), "mem" (for tests and readonly access).
|
||||
TrackingStoreType string
|
||||
|
||||
// MarkSetType is the type of mark set to use.
|
||||
//
|
||||
// Supported values are: "bloom" (default if omitted), "bolt".
|
||||
// Sane values are: "mapts", "bolt" (if you are memory constrained).
|
||||
MarkSetType string
|
||||
|
||||
// SkipMoveColdBlocks indicates whether to skip moving cold blocks to the coldstore.
|
||||
// DiscardColdBlocks indicates whether to skip moving cold blocks to the coldstore.
|
||||
// If the splitstore is running with a noop coldstore then this option is set to true
|
||||
// which skips moving (as it is a noop, but still takes time to read all the cold objects)
|
||||
// and directly purges cold blocks.
|
||||
@ -167,13 +162,13 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
|
||||
}
|
||||
|
||||
// the markset env
|
||||
markSetEnv, err := OpenMarkSetEnv(path, "mapts")
|
||||
markSetEnv, err := OpenMarkSetEnv(path, cfg.MarkSetType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// the txn markset env
|
||||
txnEnv, err := OpenMarkSetEnv(path, "mapts")
|
||||
txnEnv, err := OpenMarkSetEnv(path, cfg.MarkSetType)
|
||||
if err != nil {
|
||||
_ = markSetEnv.Close()
|
||||
return nil, err
|
||||
|
@ -1,109 +0,0 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
// TrackingStore is a persistent store that tracks blocks that are added
|
||||
// to the hotstore, tracking the epoch at which they are written.
|
||||
type TrackingStore interface {
|
||||
Put(cid.Cid, abi.ChainEpoch) error
|
||||
PutBatch([]cid.Cid, abi.ChainEpoch) error
|
||||
Get(cid.Cid) (abi.ChainEpoch, error)
|
||||
Delete(cid.Cid) error
|
||||
DeleteBatch([]cid.Cid) error
|
||||
ForEach(func(cid.Cid, abi.ChainEpoch) error) error
|
||||
Sync() error
|
||||
Close() error
|
||||
}
|
||||
|
||||
// OpenTrackingStore opens a tracking store of the specified type in the
|
||||
// specified path.
|
||||
func OpenTrackingStore(path string, ttype string) (TrackingStore, error) {
|
||||
switch ttype {
|
||||
case "", "bolt":
|
||||
return OpenBoltTrackingStore(filepath.Join(path, "tracker.bolt"))
|
||||
case "mem":
|
||||
return NewMemTrackingStore(), nil
|
||||
default:
|
||||
return nil, xerrors.Errorf("unknown tracking store type %s", ttype)
|
||||
}
|
||||
}
|
||||
|
||||
// NewMemTrackingStore creates an in-memory tracking store.
|
||||
// This is only useful for test or situations where you don't want to open the
|
||||
// real tracking store (eg concurrent read only access on a node's datastore)
|
||||
func NewMemTrackingStore() *MemTrackingStore {
|
||||
return &MemTrackingStore{tab: make(map[cid.Cid]abi.ChainEpoch)}
|
||||
}
|
||||
|
||||
// MemTrackingStore is a simple in-memory tracking store
|
||||
type MemTrackingStore struct {
|
||||
sync.Mutex
|
||||
tab map[cid.Cid]abi.ChainEpoch
|
||||
}
|
||||
|
||||
var _ TrackingStore = (*MemTrackingStore)(nil)
|
||||
|
||||
func (s *MemTrackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.tab[cid] = epoch
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MemTrackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
for _, cid := range cids {
|
||||
s.tab[cid] = epoch
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MemTrackingStore) Get(cid cid.Cid) (abi.ChainEpoch, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
epoch, ok := s.tab[cid]
|
||||
if ok {
|
||||
return epoch, nil
|
||||
}
|
||||
return 0, xerrors.Errorf("missing tracking epoch for %s", cid)
|
||||
}
|
||||
|
||||
func (s *MemTrackingStore) Delete(cid cid.Cid) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
delete(s.tab, cid)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MemTrackingStore) DeleteBatch(cids []cid.Cid) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
for _, cid := range cids {
|
||||
delete(s.tab, cid)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MemTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
for cid, epoch := range s.tab {
|
||||
err := f(cid, epoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MemTrackingStore) Sync() error { return nil }
|
||||
func (s *MemTrackingStore) Close() error { return nil }
|
@ -1,119 +0,0 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
)
|
||||
|
||||
type BoltTrackingStore struct {
|
||||
db *bolt.DB
|
||||
bucketId []byte
|
||||
}
|
||||
|
||||
var _ TrackingStore = (*BoltTrackingStore)(nil)
|
||||
|
||||
func OpenBoltTrackingStore(path string) (*BoltTrackingStore, error) {
|
||||
opts := &bolt.Options{
|
||||
Timeout: 1 * time.Second,
|
||||
}
|
||||
db, err := bolt.Open(path, 0644, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bucketId := []byte("tracker")
|
||||
err = db.Update(func(tx *bolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(bucketId)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error creating bolt db bucket %s: %w", string(bucketId), err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
_ = db.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &BoltTrackingStore{db: db, bucketId: bucketId}, nil
|
||||
}
|
||||
|
||||
func (s *BoltTrackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error {
|
||||
val := epochToBytes(epoch)
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(s.bucketId)
|
||||
return b.Put(cid.Hash(), val)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltTrackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error {
|
||||
val := epochToBytes(epoch)
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(s.bucketId)
|
||||
for _, cid := range cids {
|
||||
err := b.Put(cid.Hash(), val)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltTrackingStore) Get(cid cid.Cid) (epoch abi.ChainEpoch, err error) {
|
||||
err = s.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(s.bucketId)
|
||||
val := b.Get(cid.Hash())
|
||||
if val == nil {
|
||||
return xerrors.Errorf("missing tracking epoch for %s", cid)
|
||||
}
|
||||
epoch = bytesToEpoch(val)
|
||||
return nil
|
||||
})
|
||||
return epoch, err
|
||||
}
|
||||
|
||||
func (s *BoltTrackingStore) Delete(cid cid.Cid) error {
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(s.bucketId)
|
||||
return b.Delete(cid.Hash())
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltTrackingStore) DeleteBatch(cids []cid.Cid) error {
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(s.bucketId)
|
||||
for _, cid := range cids {
|
||||
err := b.Delete(cid.Hash())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error deleting %s", cid)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error {
|
||||
return s.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(s.bucketId)
|
||||
return b.ForEach(func(k, v []byte) error {
|
||||
cid := cid.NewCidV1(cid.Raw, k)
|
||||
epoch := bytesToEpoch(v)
|
||||
return f(cid, epoch)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltTrackingStore) Sync() error {
|
||||
return s.db.Sync()
|
||||
}
|
||||
|
||||
func (s *BoltTrackingStore) Close() error {
|
||||
return s.db.Close()
|
||||
}
|
@ -1,130 +0,0 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
"github.com/multiformats/go-multihash"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
)
|
||||
|
||||
func TestBoltTrackingStore(t *testing.T) {
|
||||
testTrackingStore(t, "bolt")
|
||||
}
|
||||
|
||||
func testTrackingStore(t *testing.T, tsType string) {
|
||||
t.Helper()
|
||||
|
||||
makeCid := func(key string) cid.Cid {
|
||||
h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return cid.NewCidV1(cid.Raw, h)
|
||||
}
|
||||
|
||||
mustHave := func(s TrackingStore, cid cid.Cid, epoch abi.ChainEpoch) {
|
||||
val, err := s.Get(cid)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if val != epoch {
|
||||
t.Fatal("epoch mismatch")
|
||||
}
|
||||
}
|
||||
|
||||
mustNotHave := func(s TrackingStore, cid cid.Cid) {
|
||||
_, err := s.Get(cid)
|
||||
if err == nil {
|
||||
t.Fatal("expected error")
|
||||
}
|
||||
}
|
||||
|
||||
path, err := ioutil.TempDir("", "snoop-test.*")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s, err := OpenTrackingStore(path, tsType)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
k1 := makeCid("a")
|
||||
k2 := makeCid("b")
|
||||
k3 := makeCid("c")
|
||||
k4 := makeCid("d")
|
||||
|
||||
s.Put(k1, 1) //nolint
|
||||
s.Put(k2, 2) //nolint
|
||||
s.Put(k3, 3) //nolint
|
||||
s.Put(k4, 4) //nolint
|
||||
|
||||
mustHave(s, k1, 1)
|
||||
mustHave(s, k2, 2)
|
||||
mustHave(s, k3, 3)
|
||||
mustHave(s, k4, 4)
|
||||
|
||||
s.Delete(k1) // nolint
|
||||
s.Delete(k2) // nolint
|
||||
|
||||
mustNotHave(s, k1)
|
||||
mustNotHave(s, k2)
|
||||
mustHave(s, k3, 3)
|
||||
mustHave(s, k4, 4)
|
||||
|
||||
s.PutBatch([]cid.Cid{k1}, 1) //nolint
|
||||
s.PutBatch([]cid.Cid{k2}, 2) //nolint
|
||||
|
||||
mustHave(s, k1, 1)
|
||||
mustHave(s, k2, 2)
|
||||
mustHave(s, k3, 3)
|
||||
mustHave(s, k4, 4)
|
||||
|
||||
allKeys := map[string]struct{}{
|
||||
k1.String(): {},
|
||||
k2.String(): {},
|
||||
k3.String(): {},
|
||||
k4.String(): {},
|
||||
}
|
||||
|
||||
err = s.ForEach(func(k cid.Cid, _ abi.ChainEpoch) error {
|
||||
_, ok := allKeys[k.String()]
|
||||
if !ok {
|
||||
t.Fatal("unexpected key")
|
||||
}
|
||||
|
||||
delete(allKeys, k.String())
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(allKeys) != 0 {
|
||||
t.Fatal("not all keys were returned")
|
||||
}
|
||||
|
||||
// no close and reopen and ensure the keys still exist
|
||||
err = s.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s, err = OpenTrackingStore(path, tsType)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
mustHave(s, k1, 1)
|
||||
mustHave(s, k2, 2)
|
||||
mustHave(s, k3, 3)
|
||||
mustHave(s, k4, 4)
|
||||
|
||||
s.Close() //nolint:errcheck
|
||||
}
|
@ -231,9 +231,7 @@ type Chainstore struct {
|
||||
type Splitstore struct {
|
||||
ColdStoreType string
|
||||
HotStoreType string
|
||||
TrackingStoreType string
|
||||
MarkSetType string
|
||||
HotHeaders bool
|
||||
}
|
||||
|
||||
// // Full Node
|
||||
@ -306,6 +304,7 @@ func DefaultFullNode() *FullNode {
|
||||
Splitstore: Splitstore{
|
||||
ColdStoreType: "universal",
|
||||
HotStoreType: "badger",
|
||||
MarkSetType: "mapts",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -78,7 +78,6 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked
|
||||
}
|
||||
|
||||
cfg := &splitstore.Config{
|
||||
TrackingStoreType: cfg.Splitstore.TrackingStoreType,
|
||||
MarkSetType: cfg.Splitstore.MarkSetType,
|
||||
DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "noop",
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user