refactor marksets for critical section on-disk persistence

This commit is contained in:
vyzo 2022-01-28 15:41:33 +02:00
parent ff10e0eaf1
commit 45c2f34295
7 changed files with 252 additions and 36 deletions

View File

@ -16,13 +16,21 @@ type MarkSet interface {
Mark(cid.Cid) error
Has(cid.Cid) (bool, error)
Close() error
// BeginCriticalSection ensures that the markset is persisted to disk for recovery in case
// of abnormal termination during the critical section span.
BeginCriticalSection() error
// EndCriticalSection ends the critical section span.
EndCriticalSection()
}
type MarkSetEnv interface {
// Create creates a new markset within the environment.
// name is a unique name for this markset, mapped to the filesystem in disk-backed environments
// New creates a new markset within the environment.
// name is a unique name for this markset, mapped to the filesystem for on-disk persistence.
// sizeHint is a hint about the expected size of the markset
Create(name string, sizeHint int64) (MarkSet, error)
New(name string, sizeHint int64) (MarkSet, error)
// Recover recovers an existing markset persisted on-disk.
Recover(name string) (MarkSet, error)
// Close closes the markset
Close() error
}
@ -30,7 +38,7 @@ type MarkSetEnv interface {
func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) {
switch mtype {
case "map":
return NewMapMarkSetEnv()
return NewMapMarkSetEnv(path)
case "badger":
return NewBadgerMarkSetEnv(path)
default:

View File

@ -28,6 +28,7 @@ type BadgerMarkSet struct {
writers int
seqno int
version int
persist bool
db *badger.DB
path string
@ -47,11 +48,10 @@ func NewBadgerMarkSetEnv(path string) (MarkSetEnv, error) {
return &BadgerMarkSetEnv{path: msPath}, nil
}
func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
name += ".tmp"
func (e *BadgerMarkSetEnv) New(name string, sizeHint int64) (MarkSet, error) {
path := filepath.Join(e.path, name)
db, err := openTransientBadgerDB(path)
db, err := openBadgerDB(path, false)
if err != nil {
return nil, xerrors.Errorf("error creating badger db: %w", err)
}
@ -67,8 +67,43 @@ func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error)
return ms, nil
}
func (e *BadgerMarkSetEnv) Recover(name string) (MarkSet, error) {
path := filepath.Join(e.path, name)
db, err := openBadgerDB(path, true)
if err != nil {
return nil, xerrors.Errorf("error creating badger db: %w", err)
}
ms := &BadgerMarkSet{
pend: make(map[string]struct{}),
writing: make(map[int]map[string]struct{}),
db: db,
path: path,
persist: true,
}
ms.cond.L = &ms.mx
return ms, nil
}
func (e *BadgerMarkSetEnv) Close() error {
return os.RemoveAll(e.path)
return nil
}
func (s *BadgerMarkSet) BeginCriticalSection() error {
s.mx.Lock()
defer s.mx.Unlock()
s.persist = true
return nil
}
func (s *BadgerMarkSet) EndCriticalSection() {
s.mx.Lock()
defer s.mx.Unlock()
s.persist = false
}
func (s *BadgerMarkSet) Mark(c cid.Cid) error {
@ -193,7 +228,7 @@ func (s *BadgerMarkSet) tryDB(key []byte) (has bool, err error) {
// writer holds the exclusive lock
func (s *BadgerMarkSet) put(key string) (write bool, seqno int) {
s.pend[key] = struct{}{}
if len(s.pend) < badgerMarkSetBatchSize {
if !s.persist && len(s.pend) < badgerMarkSetBatchSize {
return false, 0
}
@ -266,21 +301,23 @@ func (s *BadgerMarkSet) Close() error {
db := s.db
s.db = nil
return closeTransientBadgerDB(db, s.path)
return closeBadgerDB(db, s.path, s.persist)
}
func (s *BadgerMarkSet) SetConcurrent() {}
func openTransientBadgerDB(path string) (*badger.DB, error) {
// clean up first
err := os.RemoveAll(path)
if err != nil {
return nil, xerrors.Errorf("error clearing markset directory: %w", err)
}
func openBadgerDB(path string, recover bool) (*badger.DB, error) {
// if it is not a recovery, clean up first
if !recover {
err := os.RemoveAll(path)
if err != nil {
return nil, xerrors.Errorf("error clearing markset directory: %w", err)
}
err = os.MkdirAll(path, 0755) //nolint:gosec
if err != nil {
return nil, xerrors.Errorf("error creating markset directory: %w", err)
err = os.MkdirAll(path, 0755) //nolint:gosec
if err != nil {
return nil, xerrors.Errorf("error creating markset directory: %w", err)
}
}
opts := badger.DefaultOptions(path)
@ -302,12 +339,16 @@ func openTransientBadgerDB(path string) (*badger.DB, error) {
return badger.Open(opts)
}
func closeTransientBadgerDB(db *badger.DB, path string) error {
func closeBadgerDB(db *badger.DB, path string, persist bool) error {
err := db.Close()
if err != nil {
return xerrors.Errorf("error closing badger markset: %w", err)
}
if persist {
return nil
}
err = os.RemoveAll(path)
if err != nil {
return xerrors.Errorf("error deleting badger markset: %w", err)

View File

@ -1,37 +1,104 @@
package splitstore
import (
"bufio"
"io"
"os"
"path/filepath"
"sync"
"golang.org/x/xerrors"
cid "github.com/ipfs/go-cid"
)
type MapMarkSetEnv struct{}
type MapMarkSetEnv struct {
path string
}
var _ MarkSetEnv = (*MapMarkSetEnv)(nil)
type MapMarkSet struct {
mx sync.RWMutex
set map[string]struct{}
persist bool
file *os.File
buf *bufio.Writer
path string
}
var _ MarkSet = (*MapMarkSet)(nil)
func NewMapMarkSetEnv() (*MapMarkSetEnv, error) {
return &MapMarkSetEnv{}, nil
func NewMapMarkSetEnv(path string) (*MapMarkSetEnv, error) {
msPath := filepath.Join(path, "markset.map")
err := os.MkdirAll(msPath, 0755) //nolint:gosec
if err != nil {
return nil, xerrors.Errorf("error creating markset directory: %w", err)
}
return &MapMarkSetEnv{path: msPath}, nil
}
func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
func (e *MapMarkSetEnv) New(name string, sizeHint int64) (MarkSet, error) {
path := filepath.Join(e.path, name)
return &MapMarkSet{
set: make(map[string]struct{}, sizeHint),
set: make(map[string]struct{}, sizeHint),
path: path,
}, nil
}
func (e *MapMarkSetEnv) Recover(name string) (MarkSet, error) {
path := filepath.Join(e.path, name)
s := &MapMarkSet{
set: make(map[string]struct{}),
path: path,
}
in, err := os.Open(path)
if err != nil {
return nil, xerrors.Errorf("error opening markset file for read: %w", err)
}
defer in.Close()
// wrap a buffered reader to make this faster
buf := bufio.NewReader(in)
for {
var sz byte
if sz, err = buf.ReadByte(); err != nil {
break
}
key := make([]byte, int(sz))
if _, err = buf.Read(key); err != nil {
break
}
s.set[string(key)] = struct{}{}
}
if err != io.EOF {
return nil, xerrors.Errorf("error reading markset file: %w", err)
}
file, err := os.OpenFile(s.path, os.O_WRONLY|os.O_APPEND, 0)
if err != nil {
return nil, xerrors.Errorf("error opening markset file for write: %w", err)
}
s.persist = true
s.file = file
s.buf = bufio.NewWriter(file)
return s, nil
}
func (e *MapMarkSetEnv) Close() error {
return nil
}
func (s *MapMarkSet) Mark(cid cid.Cid) error {
func (s *MapMarkSet) BeginCriticalSection() error {
s.mx.Lock()
defer s.mx.Unlock()
@ -39,7 +106,66 @@ func (s *MapMarkSet) Mark(cid cid.Cid) error {
return errMarkSetClosed
}
s.set[string(cid.Hash())] = struct{}{}
if s.persist {
return nil
}
file, err := os.OpenFile(s.path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
return xerrors.Errorf("error opening markset file: %w", err)
}
// wrap a buffered writer to make this faster
s.buf = bufio.NewWriter(file)
for key := range s.set {
if err := s.writeKey([]byte(key), false); err != nil {
_ = file.Close()
s.buf = nil
return err
}
}
if err := s.buf.Flush(); err != nil {
_ = file.Close()
s.buf = nil
return xerrors.Errorf("error flushing markset file buffer: %w", err)
}
s.file = file
s.persist = true
return nil
}
func (s *MapMarkSet) EndCriticalSection() {
s.mx.Lock()
defer s.mx.Unlock()
if !s.persist {
return
}
_ = s.file.Close()
_ = os.Remove(s.path)
s.file = nil
s.buf = nil
s.persist = false
}
func (s *MapMarkSet) Mark(c cid.Cid) error {
s.mx.Lock()
defer s.mx.Unlock()
if s.set == nil {
return errMarkSetClosed
}
hash := c.Hash()
s.set[string(hash)] = struct{}{}
if s.persist {
return s.writeKey(hash, true)
}
return nil
}
@ -63,12 +189,20 @@ func (s *MapMarkSet) Visit(c cid.Cid) (bool, error) {
return false, errMarkSetClosed
}
key := string(c.Hash())
hash := c.Hash()
key := string(hash)
if _, ok := s.set[key]; ok {
return false, nil
}
s.set[key] = struct{}{}
if s.persist {
if err := s.writeKey(hash, true); err != nil {
return false, err
}
}
return true, nil
}
@ -76,6 +210,39 @@ func (s *MapMarkSet) Close() error {
s.mx.Lock()
defer s.mx.Unlock()
if s.set == nil {
return nil
}
s.set = nil
if s.file != nil {
if err := s.file.Close(); err != nil {
log.Warnf("error closing markset file: %s", err)
}
if !s.persist {
if err := os.Remove(s.path); err != nil {
log.Warnf("error removing markset file: %s", err)
}
}
}
return nil
}
func (s *MapMarkSet) writeKey(k []byte, flush bool) error {
if err := s.buf.WriteByte(byte(len(k))); err != nil {
return xerrors.Errorf("error writing markset key length to disk: %w", err)
}
if _, err := s.buf.Write(k); err != nil {
return xerrors.Errorf("error writing markset key to disk: %w", err)
}
if flush {
if err := s.buf.Flush(); err != nil {
return xerrors.Errorf("error flushing markset buffer to disk: %w", err)
}
}
return nil
}

View File

@ -42,12 +42,12 @@ func testMarkSet(t *testing.T, lsType string) {
}
defer env.Close() //nolint:errcheck
hotSet, err := env.Create("hot", 0)
hotSet, err := env.New("hot", 0)
if err != nil {
t.Fatal(err)
}
coldSet, err := env.Create("cold", 0)
coldSet, err := env.New("cold", 0)
if err != nil {
t.Fatal(err)
}
@ -114,12 +114,12 @@ func testMarkSet(t *testing.T, lsType string) {
t.Fatal(err)
}
hotSet, err = env.Create("hot", 0)
hotSet, err = env.New("hot", 0)
if err != nil {
t.Fatal(err)
}
coldSet, err = env.Create("cold", 0)
coldSet, err = env.New("cold", 0)
if err != nil {
t.Fatal(err)
}
@ -167,7 +167,7 @@ func testMarkSetVisitor(t *testing.T, lsType string) {
}
defer env.Close() //nolint:errcheck
visitor, err := env.Create("test", 0)
visitor, err := env.New("test", 0)
if err != nil {
t.Fatal(err)
}

View File

@ -89,7 +89,7 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
coldCnt := new(int64)
missingCnt := new(int64)
visitor, err := s.markSetEnv.Create("check", 0)
visitor, err := s.markSetEnv.New("check", 0)
if err != nil {
return xerrors.Errorf("error creating visitor: %w", err)
}

View File

@ -398,7 +398,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "inclMsgsEpoch", inclMsgsEpoch, "compactionIndex", s.compactionIndex)
markSet, err := s.markSetEnv.Create("live", s.markSetSize)
markSet, err := s.markSetEnv.New("live", s.markSetSize)
if err != nil {
return xerrors.Errorf("error creating mark set: %w", err)
}

View File

@ -62,7 +62,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
xcount := new(int64)
missing := new(int64)
visitor, err := s.markSetEnv.Create("warmup", 0)
visitor, err := s.markSetEnv.New("warmup", 0)
if err != nil {
return xerrors.Errorf("error creating visitor: %w", err)
}