Merge pull request #6833 from filecoin-project/feat/splitstore-badger-markset
Splitstore: support on-disk marksets using badger
This commit is contained in:
commit
02a37a1e22
@ -27,9 +27,38 @@ If you intend to use the discard coldstore, your also need to add the following:
|
||||
ColdStoreType = "discard"
|
||||
```
|
||||
In general you _should not_ have to use the discard store, unless you
|
||||
are running a network booster or have very constrained hardware with
|
||||
not enough disk space to maintain a coldstore, even with garbage
|
||||
collection.
|
||||
are running a network assistive node (like a bootstrapper or booster)
|
||||
or have very constrained hardware with not enough disk space to
|
||||
maintain a coldstore, even with garbage collection. It is also appropriate
|
||||
for small nodes that are simply watching the chain.
|
||||
|
||||
*Warning:* Using the discard store for a general purpose node is discouraged, unless
|
||||
you really know what you are doing. Use it at your own risk.
|
||||
|
||||
## Configuration Options
|
||||
|
||||
These are options in the `[Chainstore.Splitstore]` section of the configuration:
|
||||
|
||||
- `HotStoreType` -- specifies the type of hotstore to use.
|
||||
The only currently supported option is `"badger"`.
|
||||
- `ColdStoreType` -- specifies the type of coldstore to use.
|
||||
The default value is `"universal"`, which will use the initial monolith blockstore
|
||||
as the coldstore.
|
||||
The other possible value is `"discard"`, as outlined above, which is specialized for
|
||||
running without a coldstore. Note that the discard store wraps the initial monolith
|
||||
blockstore and discards writes; this is necessary to support syncing from a snapshot.
|
||||
- `MarkSetType` -- specifies the type of markset to use during compaction.
|
||||
The markset is the data structure used by compaction/gc to track live objects.
|
||||
The default value is `"map"`, which will use an in-memory map; if you are limited
|
||||
in memory (or indeed see compaction run out of memory), you can also specify
|
||||
`"badger"` which will use an disk backed markset, using badger. This will use
|
||||
much less memory, but will also make compaction slower.
|
||||
- `HotStoreMessageRetention` -- specifies how many finalities, beyond the 4
|
||||
finalities maintained by default, to maintain messages and message receipts in the
|
||||
hotstore. This is useful for assistive nodes that want to support syncing for other
|
||||
nodes beyond 4 finalities, while running with the discard coldstore option.
|
||||
It is also useful for miners who accept deals and need to lookback messages beyond
|
||||
the 4 finalities, which would otherwise hit the coldstore.
|
||||
|
||||
|
||||
## Operation
|
||||
@ -67,6 +96,6 @@ Compaction works transactionally with the following algorithm:
|
||||
- We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live
|
||||
- We then end the transaction and compact/gc the hotstore.
|
||||
|
||||
## Coldstore Garbage Collection
|
||||
## Garbage Collection
|
||||
|
||||
TBD -- see [#6577](https://github.com/filecoin-project/lotus/issues/6577)
|
||||
|
@ -32,6 +32,8 @@ func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) {
|
||||
return NewBloomMarkSetEnv()
|
||||
case "map":
|
||||
return NewMapMarkSetEnv()
|
||||
case "badger":
|
||||
return NewBadgerMarkSetEnv(path)
|
||||
default:
|
||||
return nil, xerrors.Errorf("unknown mark set type %s", mtype)
|
||||
}
|
||||
|
230
blockstore/splitstore/markset_badger.go
Normal file
230
blockstore/splitstore/markset_badger.go
Normal file
@ -0,0 +1,230 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/dgraph-io/badger/v2"
|
||||
"github.com/dgraph-io/badger/v2/options"
|
||||
"go.uber.org/zap"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
type BadgerMarkSetEnv struct {
|
||||
path string
|
||||
}
|
||||
|
||||
var _ MarkSetEnv = (*BadgerMarkSetEnv)(nil)
|
||||
|
||||
type BadgerMarkSet struct {
|
||||
mx sync.RWMutex
|
||||
cond sync.Cond
|
||||
pend map[string]struct{}
|
||||
writing map[int]map[string]struct{}
|
||||
writers int
|
||||
seqno int
|
||||
|
||||
db *badger.DB
|
||||
path string
|
||||
}
|
||||
|
||||
var _ MarkSet = (*BadgerMarkSet)(nil)
|
||||
|
||||
var badgerMarkSetBatchSize = 16384
|
||||
|
||||
func NewBadgerMarkSetEnv(path string) (MarkSetEnv, error) {
|
||||
msPath := filepath.Join(path, "markset.badger")
|
||||
err := os.MkdirAll(msPath, 0755) //nolint:gosec
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("error creating markset directory: %w", err)
|
||||
}
|
||||
|
||||
return &BadgerMarkSetEnv{path: msPath}, nil
|
||||
}
|
||||
|
||||
func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
|
||||
path := filepath.Join(e.path, name)
|
||||
|
||||
// clean up first
|
||||
err := os.RemoveAll(path)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("error clearing markset directory: %w", err)
|
||||
}
|
||||
|
||||
err = os.MkdirAll(path, 0755) //nolint:gosec
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("error creating markset directory: %w", err)
|
||||
}
|
||||
|
||||
opts := badger.DefaultOptions(path)
|
||||
opts.SyncWrites = false
|
||||
opts.CompactL0OnClose = false
|
||||
opts.Compression = options.None
|
||||
// Note: We use FileIO for loading modes to avoid memory thrashing and interference
|
||||
// between the system blockstore and the markset.
|
||||
// It was observed that using the default memory mapped option resulted in
|
||||
// significant interference and unacceptably high block validation times once the markset
|
||||
// exceeded 1GB in size.
|
||||
opts.TableLoadingMode = options.FileIO
|
||||
opts.ValueLogLoadingMode = options.FileIO
|
||||
opts.Logger = &badgerLogger{
|
||||
SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(),
|
||||
skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(),
|
||||
}
|
||||
|
||||
db, err := badger.Open(opts)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("error creating badger markset: %w", err)
|
||||
}
|
||||
|
||||
ms := &BadgerMarkSet{
|
||||
pend: make(map[string]struct{}),
|
||||
writing: make(map[int]map[string]struct{}),
|
||||
db: db,
|
||||
path: path,
|
||||
}
|
||||
ms.cond.L = &ms.mx
|
||||
|
||||
return ms, nil
|
||||
}
|
||||
|
||||
func (e *BadgerMarkSetEnv) Close() error {
|
||||
return os.RemoveAll(e.path)
|
||||
}
|
||||
|
||||
func (s *BadgerMarkSet) Mark(c cid.Cid) error {
|
||||
s.mx.Lock()
|
||||
|
||||
if s.pend == nil {
|
||||
s.mx.Unlock()
|
||||
return errMarkSetClosed
|
||||
}
|
||||
|
||||
s.pend[string(c.Hash())] = struct{}{}
|
||||
|
||||
if len(s.pend) < badgerMarkSetBatchSize {
|
||||
s.mx.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
pend := s.pend
|
||||
seqno := s.seqno
|
||||
s.seqno++
|
||||
s.writing[seqno] = pend
|
||||
s.pend = make(map[string]struct{})
|
||||
s.writers++
|
||||
s.mx.Unlock()
|
||||
|
||||
defer func() {
|
||||
s.mx.Lock()
|
||||
defer s.mx.Unlock()
|
||||
|
||||
delete(s.writing, seqno)
|
||||
s.writers--
|
||||
if s.writers == 0 {
|
||||
s.cond.Broadcast()
|
||||
}
|
||||
}()
|
||||
|
||||
empty := []byte{} // not nil
|
||||
|
||||
batch := s.db.NewWriteBatch()
|
||||
defer batch.Cancel()
|
||||
|
||||
for k := range pend {
|
||||
if err := batch.Set([]byte(k), empty); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err := batch.Flush()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error flushing batch to badger markset: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) {
|
||||
s.mx.RLock()
|
||||
defer s.mx.RUnlock()
|
||||
|
||||
if s.pend == nil {
|
||||
return false, errMarkSetClosed
|
||||
}
|
||||
|
||||
key := c.Hash()
|
||||
pendKey := string(key)
|
||||
_, ok := s.pend[pendKey]
|
||||
if ok {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
for _, wr := range s.writing {
|
||||
_, ok := wr[pendKey]
|
||||
if ok {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
err := s.db.View(func(txn *badger.Txn) error {
|
||||
_, err := txn.Get(key)
|
||||
return err
|
||||
})
|
||||
|
||||
switch err {
|
||||
case nil:
|
||||
return true, nil
|
||||
|
||||
case badger.ErrKeyNotFound:
|
||||
return false, nil
|
||||
|
||||
default:
|
||||
return false, xerrors.Errorf("error checking badger markset: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *BadgerMarkSet) Close() error {
|
||||
s.mx.Lock()
|
||||
defer s.mx.Unlock()
|
||||
|
||||
if s.pend == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for s.writers > 0 {
|
||||
s.cond.Wait()
|
||||
}
|
||||
|
||||
s.pend = nil
|
||||
db := s.db
|
||||
s.db = nil
|
||||
|
||||
err := db.Close()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error closing badger markset: %w", err)
|
||||
}
|
||||
|
||||
err = os.RemoveAll(s.path)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error deleting badger markset: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *BadgerMarkSet) SetConcurrent() {}
|
||||
|
||||
// badger logging through go-log
|
||||
type badgerLogger struct {
|
||||
*zap.SugaredLogger
|
||||
skip2 *zap.SugaredLogger
|
||||
}
|
||||
|
||||
func (b *badgerLogger) Warningf(format string, args ...interface{}) {}
|
||||
func (b *badgerLogger) Infof(format string, args ...interface{}) {}
|
||||
func (b *badgerLogger) Debugf(format string, args ...interface{}) {}
|
@ -16,6 +16,15 @@ func TestBloomMarkSet(t *testing.T) {
|
||||
testMarkSet(t, "bloom")
|
||||
}
|
||||
|
||||
func TestBadgerMarkSet(t *testing.T) {
|
||||
bs := badgerMarkSetBatchSize
|
||||
badgerMarkSetBatchSize = 1
|
||||
t.Cleanup(func() {
|
||||
badgerMarkSetBatchSize = bs
|
||||
})
|
||||
testMarkSet(t, "badger")
|
||||
}
|
||||
|
||||
func testMarkSet(t *testing.T, lsType string) {
|
||||
t.Helper()
|
||||
|
||||
|
@ -62,8 +62,11 @@ func init() {
|
||||
type Config struct {
|
||||
// MarkSetType is the type of mark set to use.
|
||||
//
|
||||
// Only current sane value is "map", but we may add an option for a disk-backed
|
||||
// markset for memory-constrained situations.
|
||||
// The default value is "map", which uses an in-memory map-backed markset.
|
||||
// If you are constrained in memory (i.e. compaction runs out of memory), you
|
||||
// can use "badger", which will use a disk-backed markset using badger.
|
||||
// Note that compaction will take quite a bit longer when using the "badger" option,
|
||||
// but that shouldn't really matter (as long as it is under 7.5hrs).
|
||||
MarkSetType string
|
||||
|
||||
// DiscardColdBlocks indicates whether to skip moving cold blocks to the coldstore.
|
||||
|
@ -184,16 +184,6 @@ func (s *SplitStore) trackTxnRef(c cid.Cid) {
|
||||
return
|
||||
}
|
||||
|
||||
if s.txnProtect != nil {
|
||||
mark, err := s.txnProtect.Has(c)
|
||||
if err != nil {
|
||||
log.Warnf("error checking markset: %s", err)
|
||||
// track it anyways
|
||||
} else if mark {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
s.txnRefsMx.Lock()
|
||||
s.txnRefs[c] = struct{}{}
|
||||
s.txnRefsMx.Unlock()
|
||||
@ -209,27 +199,11 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) {
|
||||
s.txnRefsMx.Lock()
|
||||
defer s.txnRefsMx.Unlock()
|
||||
|
||||
quiet := false
|
||||
for _, c := range cids {
|
||||
if isUnitaryObject(c) {
|
||||
continue
|
||||
}
|
||||
|
||||
if s.txnProtect != nil {
|
||||
mark, err := s.txnProtect.Has(c)
|
||||
if err != nil {
|
||||
if !quiet {
|
||||
quiet = true
|
||||
log.Warnf("error checking markset: %s", err)
|
||||
}
|
||||
// track it anyways
|
||||
}
|
||||
|
||||
if mark {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
s.txnRefs[c] = struct{}{}
|
||||
}
|
||||
|
||||
|
@ -210,6 +210,15 @@ func TestSplitStoreCompaction(t *testing.T) {
|
||||
testSplitStore(t, &Config{MarkSetType: "map"})
|
||||
}
|
||||
|
||||
func TestSplitStoreCompactionWithBadger(t *testing.T) {
|
||||
bs := badgerMarkSetBatchSize
|
||||
badgerMarkSetBatchSize = 1
|
||||
t.Cleanup(func() {
|
||||
badgerMarkSetBatchSize = bs
|
||||
})
|
||||
testSplitStore(t, &Config{MarkSetType: "badger"})
|
||||
}
|
||||
|
||||
type mockChain struct {
|
||||
t testing.TB
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user