Merge pull request #8008 from filecoin-project/feat/splitstore-sortless-compaction

splitstore sortless compaction
This commit is contained in:
Łukasz Magiera 2022-02-09 17:17:57 +00:00 committed by GitHub
commit 44fd0e3349
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1783 additions and 272 deletions

View File

@ -49,10 +49,11 @@ These are options in the `[Chainstore.Splitstore]` section of the configuration:
blockstore and discards writes; this is necessary to support syncing from a snapshot. blockstore and discards writes; this is necessary to support syncing from a snapshot.
- `MarkSetType` -- specifies the type of markset to use during compaction. - `MarkSetType` -- specifies the type of markset to use during compaction.
The markset is the data structure used by compaction/gc to track live objects. The markset is the data structure used by compaction/gc to track live objects.
The default value is `"map"`, which will use an in-memory map; if you are limited The default value is "badger", which will use a disk backed markset using badger.
in memory (or indeed see compaction run out of memory), you can also specify If you have a lot of memory (48G or more) you can also use "map", which will use
`"badger"` which will use an disk backed markset, using badger. This will use an in memory markset, speeding up compaction at the cost of higher memory usage.
much less memory, but will also make compaction slower. Note: If you are using a VPS with a network volume, you need to provision at least
3000 IOPs with the badger markset.
- `HotStoreMessageRetention` -- specifies how many finalities, beyond the 4 - `HotStoreMessageRetention` -- specifies how many finalities, beyond the 4
finalities maintained by default, to maintain messages and message receipts in the finalities maintained by default, to maintain messages and message receipts in the
hotstore. This is useful for assistive nodes that want to support syncing for other hotstore. This is useful for assistive nodes that want to support syncing for other
@ -105,6 +106,12 @@ Compaction works transactionally with the following algorithm:
- We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live - We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live
- We then end the transaction and compact/gc the hotstore. - We then end the transaction and compact/gc the hotstore.
As of [#8008](https://github.com/filecoin-project/lotus/pull/8008) the compaction algorithm has been
modified to eliminate sorting and maintain the cold object set on disk. This drastically reduces
memory usage; in fact, when using badger as the markset compaction uses very little memory, and
it should be now possible to run splitstore with 32GB of RAM or less without danger of running out of
memory during compaction.
## Garbage Collection ## Garbage Collection
TBD -- see [#6577](https://github.com/filecoin-project/lotus/issues/6577) TBD -- see [#6577](https://github.com/filecoin-project/lotus/issues/6577)

View File

@ -0,0 +1,118 @@
package splitstore
import (
"bufio"
"io"
"os"
"golang.org/x/xerrors"
cid "github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
)
type Checkpoint struct {
file *os.File
buf *bufio.Writer
}
func NewCheckpoint(path string) (*Checkpoint, error) {
file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_SYNC, 0644)
if err != nil {
return nil, xerrors.Errorf("error creating checkpoint: %w", err)
}
buf := bufio.NewWriter(file)
return &Checkpoint{
file: file,
buf: buf,
}, nil
}
func OpenCheckpoint(path string) (*Checkpoint, cid.Cid, error) {
filein, err := os.Open(path)
if err != nil {
return nil, cid.Undef, xerrors.Errorf("error opening checkpoint for reading: %w", err)
}
defer filein.Close() //nolint:errcheck
bufin := bufio.NewReader(filein)
start, err := readRawCid(bufin, nil)
if err != nil && err != io.EOF {
return nil, cid.Undef, xerrors.Errorf("error reading cid from checkpoint: %w", err)
}
fileout, err := os.OpenFile(path, os.O_WRONLY|os.O_SYNC, 0644)
if err != nil {
return nil, cid.Undef, xerrors.Errorf("error opening checkpoint for writing: %w", err)
}
bufout := bufio.NewWriter(fileout)
return &Checkpoint{
file: fileout,
buf: bufout,
}, start, nil
}
func (cp *Checkpoint) Set(c cid.Cid) error {
if _, err := cp.file.Seek(0, io.SeekStart); err != nil {
return xerrors.Errorf("error seeking beginning of checkpoint: %w", err)
}
if err := writeRawCid(cp.buf, c, true); err != nil {
return xerrors.Errorf("error writing cid to checkpoint: %w", err)
}
return nil
}
func (cp *Checkpoint) Close() error {
if cp.file == nil {
return nil
}
err := cp.file.Close()
cp.file = nil
cp.buf = nil
return err
}
func readRawCid(buf *bufio.Reader, hbuf []byte) (cid.Cid, error) {
sz, err := buf.ReadByte()
if err != nil {
return cid.Undef, err // don't wrap EOF as it is not an error here
}
if hbuf == nil {
hbuf = make([]byte, int(sz))
} else {
hbuf = hbuf[:int(sz)]
}
if _, err := io.ReadFull(buf, hbuf); err != nil {
return cid.Undef, xerrors.Errorf("error reading hash: %w", err) // wrap EOF, it's corrupt
}
hash, err := mh.Cast(hbuf)
if err != nil {
return cid.Undef, xerrors.Errorf("error casting multihash: %w", err)
}
return cid.NewCidV1(cid.Raw, hash), nil
}
func writeRawCid(buf *bufio.Writer, c cid.Cid, flush bool) error {
hash := c.Hash()
if err := buf.WriteByte(byte(len(hash))); err != nil {
return err
}
if _, err := buf.Write(hash); err != nil {
return err
}
if flush {
return buf.Flush()
}
return nil
}

View File

@ -0,0 +1,147 @@
package splitstore
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
)
func TestCheckpoint(t *testing.T) {
dir, err := ioutil.TempDir("", "checkpoint.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(dir)
})
path := filepath.Join(dir, "checkpoint")
makeCid := func(key string) cid.Cid {
h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
return cid.NewCidV1(cid.Raw, h)
}
k1 := makeCid("a")
k2 := makeCid("b")
k3 := makeCid("c")
k4 := makeCid("d")
cp, err := NewCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if err := cp.Set(k1); err != nil {
t.Fatal(err)
}
if err := cp.Set(k2); err != nil {
t.Fatal(err)
}
if err := cp.Close(); err != nil {
t.Fatal(err)
}
cp, start, err := OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if !start.Equals(k2) {
t.Fatalf("expected start to be %s; got %s", k2, start)
}
if err := cp.Set(k3); err != nil {
t.Fatal(err)
}
if err := cp.Set(k4); err != nil {
t.Fatal(err)
}
if err := cp.Close(); err != nil {
t.Fatal(err)
}
cp, start, err = OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if !start.Equals(k4) {
t.Fatalf("expected start to be %s; got %s", k4, start)
}
if err := cp.Close(); err != nil {
t.Fatal(err)
}
// also test correct operation with an empty checkpoint
cp, err = NewCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if err := cp.Close(); err != nil {
t.Fatal(err)
}
cp, start, err = OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if start.Defined() {
t.Fatal("expected start to be undefined")
}
if err := cp.Set(k1); err != nil {
t.Fatal(err)
}
if err := cp.Set(k2); err != nil {
t.Fatal(err)
}
if err := cp.Close(); err != nil {
t.Fatal(err)
}
cp, start, err = OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if !start.Equals(k2) {
t.Fatalf("expected start to be %s; got %s", k2, start)
}
if err := cp.Set(k3); err != nil {
t.Fatal(err)
}
if err := cp.Set(k4); err != nil {
t.Fatal(err)
}
if err := cp.Close(); err != nil {
t.Fatal(err)
}
cp, start, err = OpenCheckpoint(path)
if err != nil {
t.Fatal(err)
}
if !start.Equals(k4) {
t.Fatalf("expected start to be %s; got %s", k4, start)
}
if err := cp.Close(); err != nil {
t.Fatal(err)
}
}

View File

@ -0,0 +1,102 @@
package splitstore
import (
"bufio"
"io"
"os"
"golang.org/x/xerrors"
cid "github.com/ipfs/go-cid"
)
type ColdSetWriter struct {
file *os.File
buf *bufio.Writer
}
type ColdSetReader struct {
file *os.File
buf *bufio.Reader
}
func NewColdSetWriter(path string) (*ColdSetWriter, error) {
file, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
return nil, xerrors.Errorf("error creating coldset: %w", err)
}
buf := bufio.NewWriter(file)
return &ColdSetWriter{
file: file,
buf: buf,
}, nil
}
func NewColdSetReader(path string) (*ColdSetReader, error) {
file, err := os.Open(path)
if err != nil {
return nil, xerrors.Errorf("error opening coldset: %w", err)
}
buf := bufio.NewReader(file)
return &ColdSetReader{
file: file,
buf: buf,
}, nil
}
func (s *ColdSetWriter) Write(c cid.Cid) error {
return writeRawCid(s.buf, c, false)
}
func (s *ColdSetWriter) Close() error {
if s.file == nil {
return nil
}
err1 := s.buf.Flush()
err2 := s.file.Close()
s.buf = nil
s.file = nil
if err1 != nil {
return err1
}
return err2
}
func (s *ColdSetReader) ForEach(f func(cid.Cid) error) error {
hbuf := make([]byte, 256)
for {
next, err := readRawCid(s.buf, hbuf)
if err != nil {
if err == io.EOF {
return nil
}
return xerrors.Errorf("error reading coldset: %w", err)
}
if err := f(next); err != nil {
return err
}
}
}
func (s *ColdSetReader) Reset() error {
_, err := s.file.Seek(0, io.SeekStart)
return err
}
func (s *ColdSetReader) Close() error {
if s.file == nil {
return nil
}
err := s.file.Close()
s.file = nil
s.buf = nil
return err
}

View File

@ -0,0 +1,99 @@
package splitstore
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
)
func TestColdSet(t *testing.T) {
dir, err := ioutil.TempDir("", "coldset.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(dir)
})
path := filepath.Join(dir, "coldset")
makeCid := func(i int) cid.Cid {
h, err := multihash.Sum([]byte(fmt.Sprintf("cid.%d", i)), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
return cid.NewCidV1(cid.Raw, h)
}
const count = 1000
cids := make([]cid.Cid, 0, count)
for i := 0; i < count; i++ {
cids = append(cids, makeCid(i))
}
cw, err := NewColdSetWriter(path)
if err != nil {
t.Fatal(err)
}
for _, c := range cids {
if err := cw.Write(c); err != nil {
t.Fatal(err)
}
}
if err := cw.Close(); err != nil {
t.Fatal(err)
}
cr, err := NewColdSetReader(path)
if err != nil {
t.Fatal(err)
}
index := 0
err = cr.ForEach(func(c cid.Cid) error {
if index >= count {
t.Fatal("too many cids")
}
if !c.Equals(cids[index]) {
t.Fatalf("wrong cid %d; expected %s but got %s", index, cids[index], c)
}
index++
return nil
})
if err != nil {
t.Fatal(err)
}
if err := cr.Reset(); err != nil {
t.Fatal(err)
}
index = 0
err = cr.ForEach(func(c cid.Cid) error {
if index >= count {
t.Fatal("too many cids")
}
if !c.Equals(cids[index]) {
t.Fatalf("wrong cid; expected %s but got %s", cids[index], c)
}
index++
return nil
})
if err != nil {
t.Fatal(err)
}
}

View File

@ -14,15 +14,24 @@ var errMarkSetClosed = errors.New("markset closed")
type MarkSet interface { type MarkSet interface {
ObjectVisitor ObjectVisitor
Mark(cid.Cid) error Mark(cid.Cid) error
MarkMany([]cid.Cid) error
Has(cid.Cid) (bool, error) Has(cid.Cid) (bool, error)
Close() error Close() error
// BeginCriticalSection ensures that the markset is persisted to disk for recovery in case
// of abnormal termination during the critical section span.
BeginCriticalSection() error
// EndCriticalSection ends the critical section span.
EndCriticalSection()
} }
type MarkSetEnv interface { type MarkSetEnv interface {
// Create creates a new markset within the environment. // New creates a new markset within the environment.
// name is a unique name for this markset, mapped to the filesystem in disk-backed environments // name is a unique name for this markset, mapped to the filesystem for on-disk persistence.
// sizeHint is a hint about the expected size of the markset // sizeHint is a hint about the expected size of the markset
Create(name string, sizeHint int64) (MarkSet, error) New(name string, sizeHint int64) (MarkSet, error)
// Recover recovers an existing markset persisted on-disk.
Recover(name string) (MarkSet, error)
// Close closes the markset // Close closes the markset
Close() error Close() error
} }
@ -30,7 +39,7 @@ type MarkSetEnv interface {
func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) { func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) {
switch mtype { switch mtype {
case "map": case "map":
return NewMapMarkSetEnv() return NewMapMarkSetEnv(path)
case "badger": case "badger":
return NewBadgerMarkSetEnv(path) return NewBadgerMarkSetEnv(path)
default: default:

View File

@ -3,6 +3,7 @@ package splitstore
import ( import (
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"sync" "sync"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -28,6 +29,7 @@ type BadgerMarkSet struct {
writers int writers int
seqno int seqno int
version int version int
persist bool
db *badger.DB db *badger.DB
path string path string
@ -47,11 +49,10 @@ func NewBadgerMarkSetEnv(path string) (MarkSetEnv, error) {
return &BadgerMarkSetEnv{path: msPath}, nil return &BadgerMarkSetEnv{path: msPath}, nil
} }
func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { func (e *BadgerMarkSetEnv) New(name string, sizeHint int64) (MarkSet, error) {
name += ".tmp"
path := filepath.Join(e.path, name) path := filepath.Join(e.path, name)
db, err := openTransientBadgerDB(path) db, err := openBadgerDB(path, false)
if err != nil { if err != nil {
return nil, xerrors.Errorf("error creating badger db: %w", err) return nil, xerrors.Errorf("error creating badger db: %w", err)
} }
@ -67,8 +68,72 @@ func (e *BadgerMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error)
return ms, nil return ms, nil
} }
func (e *BadgerMarkSetEnv) Recover(name string) (MarkSet, error) {
path := filepath.Join(e.path, name)
if _, err := os.Stat(path); err != nil {
return nil, xerrors.Errorf("error stating badger db path: %w", err)
}
db, err := openBadgerDB(path, true)
if err != nil {
return nil, xerrors.Errorf("error creating badger db: %w", err)
}
ms := &BadgerMarkSet{
pend: make(map[string]struct{}),
writing: make(map[int]map[string]struct{}),
db: db,
path: path,
persist: true,
}
ms.cond.L = &ms.mx
return ms, nil
}
func (e *BadgerMarkSetEnv) Close() error { func (e *BadgerMarkSetEnv) Close() error {
return os.RemoveAll(e.path) return nil
}
func (s *BadgerMarkSet) BeginCriticalSection() error {
s.mx.Lock()
if s.persist {
s.mx.Unlock()
return nil
}
var write bool
var seqno int
if len(s.pend) > 0 {
write = true
seqno = s.nextBatch()
}
s.persist = true
s.mx.Unlock()
if write {
// all writes sync once perist is true
return s.write(seqno)
}
// wait for any pending writes and sync
s.mx.Lock()
for s.writers > 0 {
s.cond.Wait()
}
s.mx.Unlock()
return s.db.Sync()
}
func (s *BadgerMarkSet) EndCriticalSection() {
s.mx.Lock()
defer s.mx.Unlock()
s.persist = false
} }
func (s *BadgerMarkSet) Mark(c cid.Cid) error { func (s *BadgerMarkSet) Mark(c cid.Cid) error {
@ -88,6 +153,23 @@ func (s *BadgerMarkSet) Mark(c cid.Cid) error {
return nil return nil
} }
func (s *BadgerMarkSet) MarkMany(batch []cid.Cid) error {
s.mx.Lock()
if s.pend == nil {
s.mx.Unlock()
return errMarkSetClosed
}
write, seqno := s.putMany(batch)
s.mx.Unlock()
if write {
return s.write(seqno)
}
return nil
}
func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) { func (s *BadgerMarkSet) Has(c cid.Cid) (bool, error) {
s.mx.RLock() s.mx.RLock()
defer s.mx.RUnlock() defer s.mx.RUnlock()
@ -193,16 +275,34 @@ func (s *BadgerMarkSet) tryDB(key []byte) (has bool, err error) {
// writer holds the exclusive lock // writer holds the exclusive lock
func (s *BadgerMarkSet) put(key string) (write bool, seqno int) { func (s *BadgerMarkSet) put(key string) (write bool, seqno int) {
s.pend[key] = struct{}{} s.pend[key] = struct{}{}
if len(s.pend) < badgerMarkSetBatchSize { if !s.persist && len(s.pend) < badgerMarkSetBatchSize {
return false, 0 return false, 0
} }
seqno = s.seqno seqno = s.nextBatch()
return true, seqno
}
func (s *BadgerMarkSet) putMany(batch []cid.Cid) (write bool, seqno int) {
for _, c := range batch {
key := string(c.Hash())
s.pend[key] = struct{}{}
}
if !s.persist && len(s.pend) < badgerMarkSetBatchSize {
return false, 0
}
seqno = s.nextBatch()
return true, seqno
}
func (s *BadgerMarkSet) nextBatch() int {
seqno := s.seqno
s.seqno++ s.seqno++
s.writing[seqno] = s.pend s.writing[seqno] = s.pend
s.pend = make(map[string]struct{}) s.pend = make(map[string]struct{})
return seqno
return true, seqno
} }
func (s *BadgerMarkSet) write(seqno int) (err error) { func (s *BadgerMarkSet) write(seqno int) (err error) {
@ -247,6 +347,14 @@ func (s *BadgerMarkSet) write(seqno int) (err error) {
return xerrors.Errorf("error flushing batch to badger markset: %w", err) return xerrors.Errorf("error flushing batch to badger markset: %w", err)
} }
s.mx.RLock()
persist := s.persist
s.mx.RUnlock()
if persist {
return s.db.Sync()
}
return nil return nil
} }
@ -266,13 +374,12 @@ func (s *BadgerMarkSet) Close() error {
db := s.db db := s.db
s.db = nil s.db = nil
return closeTransientBadgerDB(db, s.path) return closeBadgerDB(db, s.path, s.persist)
} }
func (s *BadgerMarkSet) SetConcurrent() {} func openBadgerDB(path string, recover bool) (*badger.DB, error) {
// if it is not a recovery, clean up first
func openTransientBadgerDB(path string) (*badger.DB, error) { if !recover {
// clean up first
err := os.RemoveAll(path) err := os.RemoveAll(path)
if err != nil { if err != nil {
return nil, xerrors.Errorf("error clearing markset directory: %w", err) return nil, xerrors.Errorf("error clearing markset directory: %w", err)
@ -282,10 +389,14 @@ func openTransientBadgerDB(path string) (*badger.DB, error) {
if err != nil { if err != nil {
return nil, xerrors.Errorf("error creating markset directory: %w", err) return nil, xerrors.Errorf("error creating markset directory: %w", err)
} }
}
opts := badger.DefaultOptions(path) opts := badger.DefaultOptions(path)
// we manually sync when we are in critical section
opts.SyncWrites = false opts.SyncWrites = false
// no need to do that
opts.CompactL0OnClose = false opts.CompactL0OnClose = false
// we store hashes, not much to gain by compression
opts.Compression = options.None opts.Compression = options.None
// Note: We use FileIO for loading modes to avoid memory thrashing and interference // Note: We use FileIO for loading modes to avoid memory thrashing and interference
// between the system blockstore and the markset. // between the system blockstore and the markset.
@ -294,6 +405,15 @@ func openTransientBadgerDB(path string) (*badger.DB, error) {
// exceeded 1GB in size. // exceeded 1GB in size.
opts.TableLoadingMode = options.FileIO opts.TableLoadingMode = options.FileIO
opts.ValueLogLoadingMode = options.FileIO opts.ValueLogLoadingMode = options.FileIO
// We increase the number of L0 tables before compaction to make it unlikely to
// be necessary.
opts.NumLevelZeroTables = 20 // default is 5
opts.NumLevelZeroTablesStall = 30 // default is 10
// increase the number of compactors from default 2 so that if we ever have to
// compact, it is fast
if runtime.NumCPU()/2 > opts.NumCompactors {
opts.NumCompactors = runtime.NumCPU() / 2
}
opts.Logger = &badgerLogger{ opts.Logger = &badgerLogger{
SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(), SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(),
skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(), skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(),
@ -302,12 +422,16 @@ func openTransientBadgerDB(path string) (*badger.DB, error) {
return badger.Open(opts) return badger.Open(opts)
} }
func closeTransientBadgerDB(db *badger.DB, path string) error { func closeBadgerDB(db *badger.DB, path string, persist bool) error {
err := db.Close() err := db.Close()
if err != nil { if err != nil {
return xerrors.Errorf("error closing badger markset: %w", err) return xerrors.Errorf("error closing badger markset: %w", err)
} }
if persist {
return nil
}
err = os.RemoveAll(path) err = os.RemoveAll(path)
if err != nil { if err != nil {
return xerrors.Errorf("error deleting badger markset: %w", err) return xerrors.Errorf("error deleting badger markset: %w", err)

View File

@ -1,37 +1,104 @@
package splitstore package splitstore
import ( import (
"bufio"
"io"
"os"
"path/filepath"
"sync" "sync"
"golang.org/x/xerrors"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
) )
type MapMarkSetEnv struct{} type MapMarkSetEnv struct {
path string
}
var _ MarkSetEnv = (*MapMarkSetEnv)(nil) var _ MarkSetEnv = (*MapMarkSetEnv)(nil)
type MapMarkSet struct { type MapMarkSet struct {
mx sync.RWMutex mx sync.RWMutex
set map[string]struct{} set map[string]struct{}
persist bool
file *os.File
buf *bufio.Writer
path string
} }
var _ MarkSet = (*MapMarkSet)(nil) var _ MarkSet = (*MapMarkSet)(nil)
func NewMapMarkSetEnv() (*MapMarkSetEnv, error) { func NewMapMarkSetEnv(path string) (*MapMarkSetEnv, error) {
return &MapMarkSetEnv{}, nil msPath := filepath.Join(path, "markset.map")
err := os.MkdirAll(msPath, 0755) //nolint:gosec
if err != nil {
return nil, xerrors.Errorf("error creating markset directory: %w", err)
} }
func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { return &MapMarkSetEnv{path: msPath}, nil
}
func (e *MapMarkSetEnv) New(name string, sizeHint int64) (MarkSet, error) {
path := filepath.Join(e.path, name)
return &MapMarkSet{ return &MapMarkSet{
set: make(map[string]struct{}, sizeHint), set: make(map[string]struct{}, sizeHint),
path: path,
}, nil }, nil
} }
func (e *MapMarkSetEnv) Recover(name string) (MarkSet, error) {
path := filepath.Join(e.path, name)
s := &MapMarkSet{
set: make(map[string]struct{}),
path: path,
}
in, err := os.Open(path)
if err != nil {
return nil, xerrors.Errorf("error opening markset file for read: %w", err)
}
defer in.Close() //nolint:errcheck
// wrap a buffered reader to make this faster
buf := bufio.NewReader(in)
for {
var sz byte
if sz, err = buf.ReadByte(); err != nil {
break
}
key := make([]byte, int(sz))
if _, err = io.ReadFull(buf, key); err != nil {
break
}
s.set[string(key)] = struct{}{}
}
if err != io.EOF {
return nil, xerrors.Errorf("error reading markset file: %w", err)
}
file, err := os.OpenFile(s.path, os.O_WRONLY|os.O_APPEND, 0)
if err != nil {
return nil, xerrors.Errorf("error opening markset file for write: %w", err)
}
s.persist = true
s.file = file
s.buf = bufio.NewWriter(file)
return s, nil
}
func (e *MapMarkSetEnv) Close() error { func (e *MapMarkSetEnv) Close() error {
return nil return nil
} }
func (s *MapMarkSet) Mark(cid cid.Cid) error { func (s *MapMarkSet) BeginCriticalSection() error {
s.mx.Lock() s.mx.Lock()
defer s.mx.Unlock() defer s.mx.Unlock()
@ -39,7 +106,104 @@ func (s *MapMarkSet) Mark(cid cid.Cid) error {
return errMarkSetClosed return errMarkSetClosed
} }
s.set[string(cid.Hash())] = struct{}{} if s.persist {
return nil
}
file, err := os.OpenFile(s.path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
return xerrors.Errorf("error opening markset file: %w", err)
}
// wrap a buffered writer to make this faster
s.buf = bufio.NewWriter(file)
for key := range s.set {
if err := s.writeKey([]byte(key), false); err != nil {
_ = file.Close()
s.buf = nil
return err
}
}
if err := s.buf.Flush(); err != nil {
_ = file.Close()
s.buf = nil
return xerrors.Errorf("error flushing markset file buffer: %w", err)
}
s.file = file
s.persist = true
return nil
}
func (s *MapMarkSet) EndCriticalSection() {
s.mx.Lock()
defer s.mx.Unlock()
if !s.persist {
return
}
_ = s.file.Close()
_ = os.Remove(s.path)
s.file = nil
s.buf = nil
s.persist = false
}
func (s *MapMarkSet) Mark(c cid.Cid) error {
s.mx.Lock()
defer s.mx.Unlock()
if s.set == nil {
return errMarkSetClosed
}
hash := c.Hash()
s.set[string(hash)] = struct{}{}
if s.persist {
if err := s.writeKey(hash, true); err != nil {
return err
}
if err := s.file.Sync(); err != nil {
return xerrors.Errorf("error syncing markset: %w", err)
}
}
return nil
}
func (s *MapMarkSet) MarkMany(batch []cid.Cid) error {
s.mx.Lock()
defer s.mx.Unlock()
if s.set == nil {
return errMarkSetClosed
}
for _, c := range batch {
hash := c.Hash()
s.set[string(hash)] = struct{}{}
if s.persist {
if err := s.writeKey(hash, false); err != nil {
return err
}
}
}
if s.persist {
if err := s.buf.Flush(); err != nil {
return xerrors.Errorf("error flushing markset buffer to disk: %w", err)
}
if err := s.file.Sync(); err != nil {
return xerrors.Errorf("error syncing markset: %w", err)
}
}
return nil return nil
} }
@ -63,12 +227,23 @@ func (s *MapMarkSet) Visit(c cid.Cid) (bool, error) {
return false, errMarkSetClosed return false, errMarkSetClosed
} }
key := string(c.Hash()) hash := c.Hash()
key := string(hash)
if _, ok := s.set[key]; ok { if _, ok := s.set[key]; ok {
return false, nil return false, nil
} }
s.set[key] = struct{}{} s.set[key] = struct{}{}
if s.persist {
if err := s.writeKey(hash, true); err != nil {
return false, err
}
if err := s.file.Sync(); err != nil {
return false, xerrors.Errorf("error syncing markset: %w", err)
}
}
return true, nil return true, nil
} }
@ -76,6 +251,39 @@ func (s *MapMarkSet) Close() error {
s.mx.Lock() s.mx.Lock()
defer s.mx.Unlock() defer s.mx.Unlock()
s.set = nil if s.set == nil {
return nil
}
s.set = nil
if s.file != nil {
if err := s.file.Close(); err != nil {
log.Warnf("error closing markset file: %s", err)
}
if !s.persist {
if err := os.Remove(s.path); err != nil {
log.Warnf("error removing markset file: %s", err)
}
}
}
return nil
}
func (s *MapMarkSet) writeKey(k []byte, flush bool) error {
if err := s.buf.WriteByte(byte(len(k))); err != nil {
return xerrors.Errorf("error writing markset key length to disk: %w", err)
}
if _, err := s.buf.Write(k); err != nil {
return xerrors.Errorf("error writing markset key to disk: %w", err)
}
if flush {
if err := s.buf.Flush(); err != nil {
return xerrors.Errorf("error flushing markset buffer to disk: %w", err)
}
}
return nil return nil
} }

View File

@ -11,7 +11,10 @@ import (
func TestMapMarkSet(t *testing.T) { func TestMapMarkSet(t *testing.T) {
testMarkSet(t, "map") testMarkSet(t, "map")
testMarkSetRecovery(t, "map")
testMarkSetMarkMany(t, "map")
testMarkSetVisitor(t, "map") testMarkSetVisitor(t, "map")
testMarkSetVisitorRecovery(t, "map")
} }
func TestBadgerMarkSet(t *testing.T) { func TestBadgerMarkSet(t *testing.T) {
@ -21,12 +24,13 @@ func TestBadgerMarkSet(t *testing.T) {
badgerMarkSetBatchSize = bs badgerMarkSetBatchSize = bs
}) })
testMarkSet(t, "badger") testMarkSet(t, "badger")
testMarkSetRecovery(t, "badger")
testMarkSetMarkMany(t, "badger")
testMarkSetVisitor(t, "badger") testMarkSetVisitor(t, "badger")
testMarkSetVisitorRecovery(t, "badger")
} }
func testMarkSet(t *testing.T, lsType string) { func testMarkSet(t *testing.T, lsType string) {
t.Helper()
path, err := ioutil.TempDir("", "markset.*") path, err := ioutil.TempDir("", "markset.*")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -42,12 +46,12 @@ func testMarkSet(t *testing.T, lsType string) {
} }
defer env.Close() //nolint:errcheck defer env.Close() //nolint:errcheck
hotSet, err := env.Create("hot", 0) hotSet, err := env.New("hot", 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
coldSet, err := env.Create("cold", 0) coldSet, err := env.New("cold", 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -62,6 +66,7 @@ func testMarkSet(t *testing.T, lsType string) {
} }
mustHave := func(s MarkSet, cid cid.Cid) { mustHave := func(s MarkSet, cid cid.Cid) {
t.Helper()
has, err := s.Has(cid) has, err := s.Has(cid)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -73,6 +78,7 @@ func testMarkSet(t *testing.T, lsType string) {
} }
mustNotHave := func(s MarkSet, cid cid.Cid) { mustNotHave := func(s MarkSet, cid cid.Cid) {
t.Helper()
has, err := s.Has(cid) has, err := s.Has(cid)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -114,12 +120,12 @@ func testMarkSet(t *testing.T, lsType string) {
t.Fatal(err) t.Fatal(err)
} }
hotSet, err = env.Create("hot", 0) hotSet, err = env.New("hot", 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
coldSet, err = env.Create("cold", 0) coldSet, err = env.New("cold", 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -150,8 +156,6 @@ func testMarkSet(t *testing.T, lsType string) {
} }
func testMarkSetVisitor(t *testing.T, lsType string) { func testMarkSetVisitor(t *testing.T, lsType string) {
t.Helper()
path, err := ioutil.TempDir("", "markset.*") path, err := ioutil.TempDir("", "markset.*")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -167,7 +171,7 @@ func testMarkSetVisitor(t *testing.T, lsType string) {
} }
defer env.Close() //nolint:errcheck defer env.Close() //nolint:errcheck
visitor, err := env.Create("test", 0) visitor, err := env.New("test", 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -219,3 +223,322 @@ func testMarkSetVisitor(t *testing.T, lsType string) {
mustNotVisit(visitor, k3) mustNotVisit(visitor, k3)
mustNotVisit(visitor, k4) mustNotVisit(visitor, k4)
} }
func testMarkSetVisitorRecovery(t *testing.T, lsType string) {
path, err := ioutil.TempDir("", "markset.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(path)
})
env, err := OpenMarkSetEnv(path, lsType)
if err != nil {
t.Fatal(err)
}
defer env.Close() //nolint:errcheck
visitor, err := env.New("test", 0)
if err != nil {
t.Fatal(err)
}
defer visitor.Close() //nolint:errcheck
makeCid := func(key string) cid.Cid {
h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
return cid.NewCidV1(cid.Raw, h)
}
mustVisit := func(v ObjectVisitor, cid cid.Cid) {
visit, err := v.Visit(cid)
if err != nil {
t.Fatal(err)
}
if !visit {
t.Fatal("object should be visited")
}
}
mustNotVisit := func(v ObjectVisitor, cid cid.Cid) {
visit, err := v.Visit(cid)
if err != nil {
t.Fatal(err)
}
if visit {
t.Fatal("unexpected visit")
}
}
k1 := makeCid("a")
k2 := makeCid("b")
k3 := makeCid("c")
k4 := makeCid("d")
mustVisit(visitor, k1)
mustVisit(visitor, k2)
if err := visitor.BeginCriticalSection(); err != nil {
t.Fatal(err)
}
mustVisit(visitor, k3)
mustVisit(visitor, k4)
mustNotVisit(visitor, k1)
mustNotVisit(visitor, k2)
mustNotVisit(visitor, k3)
mustNotVisit(visitor, k4)
if err := visitor.Close(); err != nil {
t.Fatal(err)
}
visitor, err = env.Recover("test")
if err != nil {
t.Fatal(err)
}
mustNotVisit(visitor, k1)
mustNotVisit(visitor, k2)
mustNotVisit(visitor, k3)
mustNotVisit(visitor, k4)
visitor.EndCriticalSection()
if err := visitor.Close(); err != nil {
t.Fatal(err)
}
_, err = env.Recover("test")
if err == nil {
t.Fatal("expected recovery to fail")
}
}
func testMarkSetRecovery(t *testing.T, lsType string) {
path, err := ioutil.TempDir("", "markset.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(path)
})
env, err := OpenMarkSetEnv(path, lsType)
if err != nil {
t.Fatal(err)
}
defer env.Close() //nolint:errcheck
markSet, err := env.New("test", 0)
if err != nil {
t.Fatal(err)
}
makeCid := func(key string) cid.Cid {
h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
return cid.NewCidV1(cid.Raw, h)
}
mustHave := func(s MarkSet, cid cid.Cid) {
t.Helper()
has, err := s.Has(cid)
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatal("mark not found")
}
}
mustNotHave := func(s MarkSet, cid cid.Cid) {
t.Helper()
has, err := s.Has(cid)
if err != nil {
t.Fatal(err)
}
if has {
t.Fatal("unexpected mark")
}
}
k1 := makeCid("a")
k2 := makeCid("b")
k3 := makeCid("c")
k4 := makeCid("d")
if err := markSet.Mark(k1); err != nil {
t.Fatal(err)
}
if err := markSet.Mark(k2); err != nil {
t.Fatal(err)
}
mustHave(markSet, k1)
mustHave(markSet, k2)
mustNotHave(markSet, k3)
mustNotHave(markSet, k4)
if err := markSet.BeginCriticalSection(); err != nil {
t.Fatal(err)
}
if err := markSet.Mark(k3); err != nil {
t.Fatal(err)
}
if err := markSet.Mark(k4); err != nil {
t.Fatal(err)
}
mustHave(markSet, k1)
mustHave(markSet, k2)
mustHave(markSet, k3)
mustHave(markSet, k4)
if err := markSet.Close(); err != nil {
t.Fatal(err)
}
markSet, err = env.Recover("test")
if err != nil {
t.Fatal(err)
}
mustHave(markSet, k1)
mustHave(markSet, k2)
mustHave(markSet, k3)
mustHave(markSet, k4)
markSet.EndCriticalSection()
if err := markSet.Close(); err != nil {
t.Fatal(err)
}
_, err = env.Recover("test")
if err == nil {
t.Fatal("expected recovery to fail")
}
}
func testMarkSetMarkMany(t *testing.T, lsType string) {
path, err := ioutil.TempDir("", "markset.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(path)
})
env, err := OpenMarkSetEnv(path, lsType)
if err != nil {
t.Fatal(err)
}
defer env.Close() //nolint:errcheck
markSet, err := env.New("test", 0)
if err != nil {
t.Fatal(err)
}
makeCid := func(key string) cid.Cid {
h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
return cid.NewCidV1(cid.Raw, h)
}
mustHave := func(s MarkSet, cid cid.Cid) {
t.Helper()
has, err := s.Has(cid)
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatal("mark not found")
}
}
mustNotHave := func(s MarkSet, cid cid.Cid) {
t.Helper()
has, err := s.Has(cid)
if err != nil {
t.Fatal(err)
}
if has {
t.Fatal("unexpected mark")
}
}
k1 := makeCid("a")
k2 := makeCid("b")
k3 := makeCid("c")
k4 := makeCid("d")
if err := markSet.MarkMany([]cid.Cid{k1, k2}); err != nil {
t.Fatal(err)
}
mustHave(markSet, k1)
mustHave(markSet, k2)
mustNotHave(markSet, k3)
mustNotHave(markSet, k4)
if err := markSet.BeginCriticalSection(); err != nil {
t.Fatal(err)
}
if err := markSet.MarkMany([]cid.Cid{k3, k4}); err != nil {
t.Fatal(err)
}
mustHave(markSet, k1)
mustHave(markSet, k2)
mustHave(markSet, k3)
mustHave(markSet, k4)
if err := markSet.Close(); err != nil {
t.Fatal(err)
}
markSet, err = env.Recover("test")
if err != nil {
t.Fatal(err)
}
mustHave(markSet, k1)
mustHave(markSet, k2)
mustHave(markSet, k3)
mustHave(markSet, k4)
markSet.EndCriticalSection()
if err := markSet.Close(); err != nil {
t.Fatal(err)
}
_, err = env.Recover("test")
if err == nil {
t.Fatal("expected recovery to fail")
}
}

View File

@ -129,8 +129,6 @@ type SplitStore struct {
headChangeMx sync.Mutex headChangeMx sync.Mutex
coldPurgeSize int
chain ChainAccessor chain ChainAccessor
ds dstore.Datastore ds dstore.Datastore
cold bstore.Blockstore cold bstore.Blockstore
@ -158,6 +156,10 @@ type SplitStore struct {
txnRefsMx sync.Mutex txnRefsMx sync.Mutex
txnRefs map[cid.Cid]struct{} txnRefs map[cid.Cid]struct{}
txnMissing map[cid.Cid]struct{} txnMissing map[cid.Cid]struct{}
txnMarkSet MarkSet
txnSyncMx sync.Mutex
txnSyncCond sync.Cond
txnSync bool
// registered protectors // registered protectors
protectors []func(func(cid.Cid) error) error protectors []func(func(cid.Cid) error) error
@ -194,11 +196,10 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
cold: cold, cold: cold,
hot: hots, hot: hots,
markSetEnv: markSetEnv, markSetEnv: markSetEnv,
coldPurgeSize: defaultColdPurgeSize,
} }
ss.txnViewsCond.L = &ss.txnViewsMx ss.txnViewsCond.L = &ss.txnViewsMx
ss.txnSyncCond.L = &ss.txnSyncMx
ss.ctx, ss.cancel = context.WithCancel(context.Background()) ss.ctx, ss.cancel = context.WithCancel(context.Background())
if enableDebugLog { if enableDebugLog {
@ -208,6 +209,14 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
} }
} }
if ss.checkpointExists() {
log.Info("found compaction checkpoint; resuming compaction")
if err := ss.completeCompaction(); err != nil {
markSetEnv.Close() //nolint:errcheck
return nil, xerrors.Errorf("error resuming compaction: %w", err)
}
}
return ss, nil return ss, nil
} }
@ -230,6 +239,20 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
s.txnLk.RLock() s.txnLk.RLock()
defer s.txnLk.RUnlock() defer s.txnLk.RUnlock()
// critical section
if s.txnMarkSet != nil {
has, err := s.txnMarkSet.Has(cid)
if err != nil {
return false, err
}
if has {
return s.has(cid)
}
return s.cold.Has(ctx, cid)
}
has, err := s.hot.Has(ctx, cid) has, err := s.hot.Has(ctx, cid)
if err != nil { if err != nil {
@ -257,6 +280,20 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)
s.txnLk.RLock() s.txnLk.RLock()
defer s.txnLk.RUnlock() defer s.txnLk.RUnlock()
// critical section
if s.txnMarkSet != nil {
has, err := s.txnMarkSet.Has(cid)
if err != nil {
return nil, err
}
if has {
return s.get(cid)
}
return s.cold.Get(ctx, cid)
}
blk, err := s.hot.Get(ctx, cid) blk, err := s.hot.Get(ctx, cid)
switch err { switch err {
@ -294,6 +331,20 @@ func (s *SplitStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
s.txnLk.RLock() s.txnLk.RLock()
defer s.txnLk.RUnlock() defer s.txnLk.RUnlock()
// critical section
if s.txnMarkSet != nil {
has, err := s.txnMarkSet.Has(cid)
if err != nil {
return 0, err
}
if has {
return s.getSize(cid)
}
return s.cold.GetSize(ctx, cid)
}
size, err := s.hot.GetSize(ctx, cid) size, err := s.hot.GetSize(ctx, cid)
switch err { switch err {
@ -332,6 +383,12 @@ func (s *SplitStore) Put(ctx context.Context, blk blocks.Block) error {
s.debug.LogWrite(blk) s.debug.LogWrite(blk)
// critical section
if s.txnMarkSet != nil {
s.markLiveRefs([]cid.Cid{blk.Cid()})
return nil
}
s.trackTxnRef(blk.Cid()) s.trackTxnRef(blk.Cid())
return nil return nil
} }
@ -377,6 +434,12 @@ func (s *SplitStore) PutMany(ctx context.Context, blks []blocks.Block) error {
s.debug.LogWriteMany(blks) s.debug.LogWriteMany(blks)
// critical section
if s.txnMarkSet != nil {
s.markLiveRefs(batch)
return nil
}
s.trackTxnRefMany(batch) s.trackTxnRefMany(batch)
return nil return nil
} }
@ -436,6 +499,23 @@ func (s *SplitStore) View(ctx context.Context, cid cid.Cid, cb func([]byte) erro
return cb(data) return cb(data)
} }
// critical section
s.txnLk.RLock() // the lock is released in protectView if we are not in critical section
if s.txnMarkSet != nil {
has, err := s.txnMarkSet.Has(cid)
s.txnLk.RUnlock()
if err != nil {
return err
}
if has {
return s.view(cid, cb)
}
return s.cold.View(ctx, cid, cb)
}
// views are (optimistically) protected two-fold: // views are (optimistically) protected two-fold:
// - if there is an active transaction, then the reference is protected. // - if there is an active transaction, then the reference is protected.
// - if there is no active transaction, active views are tracked in a // - if there is no active transaction, active views are tracked in a
@ -585,6 +665,11 @@ func (s *SplitStore) Close() error {
} }
if atomic.LoadInt32(&s.compacting) == 1 { if atomic.LoadInt32(&s.compacting) == 1 {
s.txnSyncMx.Lock()
s.txnSync = true
s.txnSyncCond.Broadcast()
s.txnSyncMx.Unlock()
log.Warn("close with ongoing compaction in progress; waiting for it to finish...") log.Warn("close with ongoing compaction in progress; waiting for it to finish...")
for atomic.LoadInt32(&s.compacting) == 1 { for atomic.LoadInt32(&s.compacting) == 1 {
time.Sleep(time.Second) time.Sleep(time.Second)

View File

@ -89,7 +89,7 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
coldCnt := new(int64) coldCnt := new(int64)
missingCnt := new(int64) missingCnt := new(int64)
visitor, err := s.markSetEnv.Create("check", 0) visitor, err := s.markSetEnv.New("check", 0)
if err != nil { if err != nil {
return xerrors.Errorf("error creating visitor: %w", err) return xerrors.Errorf("error creating visitor: %w", err)
} }

View File

@ -3,8 +3,9 @@ package splitstore
import ( import (
"bytes" "bytes"
"errors" "errors"
"os"
"path/filepath"
"runtime" "runtime"
"sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -48,6 +49,10 @@ var (
// SyncGapTime is the time delay from a tipset's min timestamp before we decide // SyncGapTime is the time delay from a tipset's min timestamp before we decide
// there is a sync gap // there is a sync gap
SyncGapTime = time.Minute SyncGapTime = time.Minute
// SyncWaitTime is the time delay from a tipset's min timestamp before we decide
// we have synced.
SyncWaitTime = 30 * time.Second
) )
var ( var (
@ -57,8 +62,6 @@ var (
const ( const (
batchSize = 16384 batchSize = 16384
defaultColdPurgeSize = 7_000_000
) )
func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
@ -141,9 +144,9 @@ func (s *SplitStore) isNearUpgrade(epoch abi.ChainEpoch) bool {
// transactionally protect incoming tipsets // transactionally protect incoming tipsets
func (s *SplitStore) protectTipSets(apply []*types.TipSet) { func (s *SplitStore) protectTipSets(apply []*types.TipSet) {
s.txnLk.RLock() s.txnLk.RLock()
defer s.txnLk.RUnlock()
if !s.txnActive { if !s.txnActive {
s.txnLk.RUnlock()
return return
} }
@ -152,12 +155,115 @@ func (s *SplitStore) protectTipSets(apply []*types.TipSet) {
cids = append(cids, ts.Cids()...) cids = append(cids, ts.Cids()...)
} }
if len(cids) == 0 {
s.txnLk.RUnlock()
return
}
// critical section
if s.txnMarkSet != nil {
curTs := apply[len(apply)-1]
timestamp := time.Unix(int64(curTs.MinTimestamp()), 0)
doSync := time.Since(timestamp) < SyncWaitTime
go func() {
if doSync {
defer func() {
s.txnSyncMx.Lock()
defer s.txnSyncMx.Unlock()
s.txnSync = true
s.txnSyncCond.Broadcast()
}()
}
defer s.txnLk.RUnlock()
s.markLiveRefs(cids)
}()
return
}
s.trackTxnRefMany(cids) s.trackTxnRefMany(cids)
s.txnLk.RUnlock()
}
func (s *SplitStore) markLiveRefs(cids []cid.Cid) {
log.Debugf("marking %d live refs", len(cids))
startMark := time.Now()
count := new(int32)
visitor := newConcurrentVisitor()
walkObject := func(c cid.Cid) error {
return s.walkObjectIncomplete(c, visitor,
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}
visit, err := s.txnMarkSet.Visit(c)
if err != nil {
return xerrors.Errorf("error visiting object: %w", err)
}
if !visit {
return errStopWalk
}
atomic.AddInt32(count, 1)
return nil
},
func(missing cid.Cid) error {
log.Warnf("missing object reference %s in %s", missing, c)
return errStopWalk
})
}
// optimize the common case of single put
if len(cids) == 1 {
if err := walkObject(cids[0]); err != nil {
log.Errorf("error marking tipset refs: %s", err)
}
log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count)
return
}
workch := make(chan cid.Cid, len(cids))
for _, c := range cids {
workch <- c
}
close(workch)
worker := func() error {
for c := range workch {
if err := walkObject(c); err != nil {
return err
}
}
return nil
}
workers := runtime.NumCPU() / 2
if workers < 2 {
workers = 2
}
if workers > len(cids) {
workers = len(cids)
}
g := new(errgroup.Group)
for i := 0; i < workers; i++ {
g.Go(worker)
}
if err := g.Wait(); err != nil {
log.Errorf("error marking tipset refs: %s", err)
}
log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count)
} }
// transactionally protect a view // transactionally protect a view
func (s *SplitStore) protectView(c cid.Cid) { func (s *SplitStore) protectView(c cid.Cid) {
s.txnLk.RLock() // the txnLk is held for read
defer s.txnLk.RUnlock() defer s.txnLk.RUnlock()
if s.txnActive { if s.txnActive {
@ -387,6 +493,12 @@ func (s *SplitStore) compact(curTs *types.TipSet) {
} }
func (s *SplitStore) doCompact(curTs *types.TipSet) error { func (s *SplitStore) doCompact(curTs *types.TipSet) error {
if s.checkpointExists() {
// this really shouldn't happen, but if it somehow does, it means that the hotstore
// might be potentially inconsistent; abort compaction and notify the user to intervene.
return xerrors.Errorf("checkpoint exists; aborting compaction")
}
currentEpoch := curTs.Height() currentEpoch := curTs.Height()
boundaryEpoch := currentEpoch - CompactionBoundary boundaryEpoch := currentEpoch - CompactionBoundary
@ -398,7 +510,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "inclMsgsEpoch", inclMsgsEpoch, "compactionIndex", s.compactionIndex) log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "inclMsgsEpoch", inclMsgsEpoch, "compactionIndex", s.compactionIndex)
markSet, err := s.markSetEnv.Create("live", s.markSetSize) markSet, err := s.markSetEnv.New("live", s.markSetSize)
if err != nil { if err != nil {
return xerrors.Errorf("error creating mark set: %w", err) return xerrors.Errorf("error creating mark set: %w", err)
} }
@ -409,9 +521,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return err return err
} }
// we are ready for concurrent marking
s.beginTxnMarking(markSet)
// 0. track all protected references at beginning of compaction; anything added later should // 0. track all protected references at beginning of compaction; anything added later should
// be transactionally protected by the write // be transactionally protected by the write
log.Info("protecting references with registered protectors") log.Info("protecting references with registered protectors")
@ -425,7 +534,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Info("marking reachable objects") log.Info("marking reachable objects")
startMark := time.Now() startMark := time.Now()
var count int64 count := new(int64)
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{}, err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{},
func(c cid.Cid) error { func(c cid.Cid) error {
if isUnitaryObject(c) { if isUnitaryObject(c) {
@ -441,7 +550,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return errStopWalk return errStopWalk
} }
count++ atomic.AddInt64(count, 1)
return nil return nil
}) })
@ -449,9 +558,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return xerrors.Errorf("error marking: %w", err) return xerrors.Errorf("error marking: %w", err)
} }
s.markSetSize = count + count>>2 // overestimate a bit s.markSetSize = *count + *count>>2 // overestimate a bit
log.Infow("marking done", "took", time.Since(startMark), "marked", count) log.Infow("marking done", "took", time.Since(startMark), "marked", *count)
if err := s.checkClosing(); err != nil { if err := s.checkClosing(); err != nil {
return err return err
@ -471,10 +580,15 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Info("collecting cold objects") log.Info("collecting cold objects")
startCollect := time.Now() startCollect := time.Now()
coldw, err := NewColdSetWriter(s.coldSetPath())
if err != nil {
return xerrors.Errorf("error creating coldset: %w", err)
}
defer coldw.Close() //nolint:errcheck
// some stats for logging // some stats for logging
var hotCnt, coldCnt int var hotCnt, coldCnt int
cold := make([]cid.Cid, 0, s.coldPurgeSize)
err = s.hot.ForEachKey(func(c cid.Cid) error { err = s.hot.ForEachKey(func(c cid.Cid) error {
// was it marked? // was it marked?
mark, err := markSet.Has(c) mark, err := markSet.Has(c)
@ -488,7 +602,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
} }
// it's cold, mark it as candidate for move // it's cold, mark it as candidate for move
cold = append(cold, c) if err := coldw.Write(c); err != nil {
return xerrors.Errorf("error writing cid to coldstore: %w", err)
}
coldCnt++ coldCnt++
return nil return nil
@ -498,12 +614,12 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return xerrors.Errorf("error collecting cold objects: %w", err) return xerrors.Errorf("error collecting cold objects: %w", err)
} }
log.Infow("cold collection done", "took", time.Since(startCollect)) if err := coldw.Close(); err != nil {
return xerrors.Errorf("error closing coldset: %w", err)
if coldCnt > 0 {
s.coldPurgeSize = coldCnt + coldCnt>>2 // overestimate a bit
} }
log.Infow("cold collection done", "took", time.Since(startCollect))
log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt) log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt)
stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(int64(hotCnt))) stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(int64(hotCnt)))
stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(int64(coldCnt))) stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(int64(coldCnt)))
@ -521,11 +637,17 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return err return err
} }
coldr, err := NewColdSetReader(s.coldSetPath())
if err != nil {
return xerrors.Errorf("error opening coldset: %w", err)
}
defer coldr.Close() //nolint:errcheck
// 3. copy the cold objects to the coldstore -- if we have one // 3. copy the cold objects to the coldstore -- if we have one
if !s.cfg.DiscardColdBlocks { if !s.cfg.DiscardColdBlocks {
log.Info("moving cold objects to the coldstore") log.Info("moving cold objects to the coldstore")
startMove := time.Now() startMove := time.Now()
err = s.moveColdBlocks(cold) err = s.moveColdBlocks(coldr)
if err != nil { if err != nil {
return xerrors.Errorf("error moving cold objects: %w", err) return xerrors.Errorf("error moving cold objects: %w", err)
} }
@ -534,41 +656,64 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
if err := s.checkClosing(); err != nil { if err := s.checkClosing(); err != nil {
return err return err
} }
if err := coldr.Reset(); err != nil {
return xerrors.Errorf("error resetting coldset: %w", err)
}
} }
// 4. sort cold objects so that the dags with most references are deleted first // 4. Purge cold objects with checkpointing for recovery.
// this ensures that we can't refer to a dag with its consituents already deleted, ie // This is the critical section of compaction, whereby any cold object not in the markSet is
// we lave no dangling references. // considered already deleted.
log.Info("sorting cold objects") // We delete cold objects in batches, holding the transaction lock, where we check the markSet
startSort := time.Now() // again for new references created by the VM.
err = s.sortObjects(cold) // After each batch, we write a checkpoint to disk; if the process is interrupted before completion,
if err != nil { // the process will continue from the checkpoint in the next recovery.
return xerrors.Errorf("error sorting objects: %w", err) if err := s.beginCriticalSection(markSet); err != nil {
} return xerrors.Errorf("error beginning critical section: %w", err)
log.Infow("sorting done", "took", time.Since(startSort))
// 4.1 protect transactional refs once more
// strictly speaking, this is not necessary as purge will do it before deleting each
// batch. however, there is likely a largish number of references accumulated during
// ths sort and this protects before entering pruge context.
err = s.protectTxnRefs(markSet)
if err != nil {
return xerrors.Errorf("error protecting transactional refs: %w", err)
} }
if err := s.checkClosing(); err != nil { if err := s.checkClosing(); err != nil {
return err return err
} }
// wait for the head to catch up so that the current tipset is marked
s.waitForSync()
if err := s.checkClosing(); err != nil {
return err
}
checkpoint, err := NewCheckpoint(s.checkpointPath())
if err != nil {
return xerrors.Errorf("error creating checkpoint: %w", err)
}
defer checkpoint.Close() //nolint:errcheck
// 5. purge cold objects from the hotstore, taking protected references into account // 5. purge cold objects from the hotstore, taking protected references into account
log.Info("purging cold objects from the hotstore") log.Info("purging cold objects from the hotstore")
startPurge := time.Now() startPurge := time.Now()
err = s.purge(cold, markSet) err = s.purge(coldr, checkpoint, markSet)
if err != nil { if err != nil {
return xerrors.Errorf("error purging cold blocks: %w", err) return xerrors.Errorf("error purging cold objects: %w", err)
} }
log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge)) log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge))
s.endCriticalSection()
if err := checkpoint.Close(); err != nil {
log.Warnf("error closing checkpoint: %s", err)
}
if err := os.Remove(s.checkpointPath()); err != nil {
log.Warnf("error removing checkpoint: %s", err)
}
if err := coldr.Close(); err != nil {
log.Warnf("error closing coldset: %s", err)
}
if err := os.Remove(s.coldSetPath()); err != nil {
log.Warnf("error removing coldset: %s", err)
}
// we are done; do some housekeeping // we are done; do some housekeeping
s.endTxnProtect() s.endTxnProtect()
s.gcHotstore() s.gcHotstore()
@ -599,12 +744,51 @@ func (s *SplitStore) beginTxnProtect() {
defer s.txnLk.Unlock() defer s.txnLk.Unlock()
s.txnActive = true s.txnActive = true
s.txnSync = false
s.txnRefs = make(map[cid.Cid]struct{}) s.txnRefs = make(map[cid.Cid]struct{})
s.txnMissing = make(map[cid.Cid]struct{}) s.txnMissing = make(map[cid.Cid]struct{})
} }
func (s *SplitStore) beginTxnMarking(markSet MarkSet) { func (s *SplitStore) beginCriticalSection(markSet MarkSet) error {
log.Info("beginning transactional marking") log.Info("beginning critical section")
// do that once first to get the bulk before the markset is in critical section
if err := s.protectTxnRefs(markSet); err != nil {
return xerrors.Errorf("error protecting transactional references: %w", err)
}
if err := markSet.BeginCriticalSection(); err != nil {
return xerrors.Errorf("error beginning critical section for markset: %w", err)
}
s.txnLk.Lock()
defer s.txnLk.Unlock()
s.txnMarkSet = markSet
// and do it again while holding the lock to mark references that might have been created
// in the meantime and avoid races of the type Has->txnRef->enterCS->Get fails because
// it's not in the markset
if err := s.protectTxnRefs(markSet); err != nil {
return xerrors.Errorf("error protecting transactional references: %w", err)
}
return nil
}
func (s *SplitStore) waitForSync() {
log.Info("waiting for sync")
startWait := time.Now()
defer func() {
log.Infow("waiting for sync done", "took", time.Since(startWait))
}()
s.txnSyncMx.Lock()
defer s.txnSyncMx.Unlock()
for !s.txnSync {
s.txnSyncCond.Wait()
}
} }
func (s *SplitStore) endTxnProtect() { func (s *SplitStore) endTxnProtect() {
@ -616,8 +800,20 @@ func (s *SplitStore) endTxnProtect() {
} }
s.txnActive = false s.txnActive = false
s.txnSync = false
s.txnRefs = nil s.txnRefs = nil
s.txnMissing = nil s.txnMissing = nil
s.txnMarkSet = nil
}
func (s *SplitStore) endCriticalSection() {
log.Info("ending critical section")
s.txnLk.Lock()
defer s.txnLk.Unlock()
s.txnMarkSet.EndCriticalSection()
s.txnMarkSet = nil
} }
func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEpoch, func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEpoch,
@ -857,7 +1053,7 @@ func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, m
return nil return nil
} }
// internal version used by walk // internal version used during compaction and related operations
func (s *SplitStore) view(c cid.Cid, cb func([]byte) error) error { func (s *SplitStore) view(c cid.Cid, cb func([]byte) error) error {
if isIdentiyCid(c) { if isIdentiyCid(c) {
data, err := decodeIdentityCid(c) data, err := decodeIdentityCid(c)
@ -892,10 +1088,34 @@ func (s *SplitStore) has(c cid.Cid) (bool, error) {
return s.cold.Has(s.ctx, c) return s.cold.Has(s.ctx, c)
} }
func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error { func (s *SplitStore) get(c cid.Cid) (blocks.Block, error) {
blk, err := s.hot.Get(s.ctx, c)
switch err {
case nil:
return blk, nil
case bstore.ErrNotFound:
return s.cold.Get(s.ctx, c)
default:
return nil, err
}
}
func (s *SplitStore) getSize(c cid.Cid) (int, error) {
sz, err := s.hot.GetSize(s.ctx, c)
switch err {
case nil:
return sz, nil
case bstore.ErrNotFound:
return s.cold.GetSize(s.ctx, c)
default:
return 0, err
}
}
func (s *SplitStore) moveColdBlocks(coldr *ColdSetReader) error {
batch := make([]blocks.Block, 0, batchSize) batch := make([]blocks.Block, 0, batchSize)
for _, c := range cold { err := coldr.ForEach(func(c cid.Cid) error {
if err := s.checkClosing(); err != nil { if err := s.checkClosing(); err != nil {
return err return err
} }
@ -904,7 +1124,7 @@ func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
if err != nil { if err != nil {
if err == bstore.ErrNotFound { if err == bstore.ErrNotFound {
log.Warnf("hotstore missing block %s", c) log.Warnf("hotstore missing block %s", c)
continue return nil
} }
return xerrors.Errorf("error retrieving block %s from hotstore: %w", c, err) return xerrors.Errorf("error retrieving block %s from hotstore: %w", c, err)
@ -918,6 +1138,12 @@ func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
} }
batch = batch[:0] batch = batch[:0]
} }
return nil
})
if err != nil {
return xerrors.Errorf("error iterating coldset: %w", err)
} }
if len(batch) > 0 { if len(batch) > 0 {
@ -930,160 +1156,60 @@ func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
return nil return nil
} }
// sorts a slice of objects heaviest first -- it's a little expensive but worth the func (s *SplitStore) purge(coldr *ColdSetReader, checkpoint *Checkpoint, markSet MarkSet) error {
// guarantee that we don't leave dangling references behind, e.g. if we die in the middle batch := make([]cid.Cid, 0, batchSize)
// of a purge.
func (s *SplitStore) sortObjects(cids []cid.Cid) error {
// we cache the keys to avoid making a gazillion of strings
keys := make(map[cid.Cid]string)
key := func(c cid.Cid) string {
s, ok := keys[c]
if !ok {
s = string(c.Hash())
keys[c] = s
}
return s
}
// compute sorting weights as the cumulative number of DAG links
weights := make(map[string]int)
for _, c := range cids {
// this can take quite a while, so check for shutdown with every opportunity
if err := s.checkClosing(); err != nil {
return err
}
w := s.getObjectWeight(c, weights, key)
weights[key(c)] = w
}
// sort!
sort.Slice(cids, func(i, j int) bool {
wi := weights[key(cids[i])]
wj := weights[key(cids[j])]
if wi == wj {
return bytes.Compare(cids[i].Hash(), cids[j].Hash()) > 0
}
return wi > wj
})
return nil
}
func (s *SplitStore) getObjectWeight(c cid.Cid, weights map[string]int, key func(cid.Cid) string) int {
w, ok := weights[key(c)]
if ok {
return w
}
// we treat block headers specially to avoid walking the entire chain
var hdr types.BlockHeader
err := s.view(c, func(data []byte) error {
return hdr.UnmarshalCBOR(bytes.NewBuffer(data))
})
if err == nil {
w1 := s.getObjectWeight(hdr.ParentStateRoot, weights, key)
weights[key(hdr.ParentStateRoot)] = w1
w2 := s.getObjectWeight(hdr.Messages, weights, key)
weights[key(hdr.Messages)] = w2
return 1 + w1 + w2
}
var links []cid.Cid
err = s.view(c, func(data []byte) error {
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
links = append(links, c)
})
})
if err != nil {
return 1
}
w = 1
for _, c := range links {
// these are internal refs, so dags will be dags
if c.Prefix().Codec != cid.DagCBOR {
w++
continue
}
wc := s.getObjectWeight(c, weights, key)
weights[key(c)] = wc
w += wc
}
return w
}
func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) error) error {
if len(cids) == 0 {
return nil
}
// we don't delete one giant batch of millions of objects, but rather do smaller batches
// so that we don't stop the world for an extended period of time
done := false
for i := 0; !done; i++ {
start := i * batchSize
end := start + batchSize
if end >= len(cids) {
end = len(cids)
done = true
}
err := deleteBatch(cids[start:end])
if err != nil {
return xerrors.Errorf("error deleting batch: %w", err)
}
}
return nil
}
func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error {
deadCids := make([]cid.Cid, 0, batchSize) deadCids := make([]cid.Cid, 0, batchSize)
var purgeCnt, liveCnt int var purgeCnt, liveCnt int
defer func() { defer func() {
log.Infow("purged cold objects", "purged", purgeCnt, "live", liveCnt) log.Infow("purged cold objects", "purged", purgeCnt, "live", liveCnt)
}() }()
return s.purgeBatch(cids, deleteBatch := func() error {
func(cids []cid.Cid) error { pc, lc, err := s.purgeBatch(batch, deadCids, checkpoint, markSet)
deadCids := deadCids[:0]
purgeCnt += pc
liveCnt += lc
batch = batch[:0]
for {
if err := s.checkClosing(); err != nil {
return err return err
} }
s.txnLk.Lock() err := coldr.ForEach(func(c cid.Cid) error {
if len(s.txnRefs) == 0 { batch = append(batch, c)
// keep the lock! if len(batch) == batchSize {
break return deleteBatch()
} }
// unlock and protect return nil
s.txnLk.Unlock() })
err := s.protectTxnRefs(markSet)
if err != nil { if err != nil {
return xerrors.Errorf("error protecting transactional refs: %w", err) return err
}
} }
if len(batch) > 0 {
return deleteBatch()
}
return nil
}
func (s *SplitStore) purgeBatch(batch, deadCids []cid.Cid, checkpoint *Checkpoint, markSet MarkSet) (purgeCnt int, liveCnt int, err error) {
if err := s.checkClosing(); err != nil {
return 0, 0, err
}
s.txnLk.Lock()
defer s.txnLk.Unlock() defer s.txnLk.Unlock()
for _, c := range cids { for _, c := range batch {
live, err := markSet.Has(c) has, err := markSet.Has(c)
if err != nil { if err != nil {
return xerrors.Errorf("error checking for liveness: %w", err) return 0, 0, xerrors.Errorf("error checking markset for liveness: %w", err)
} }
if live { if has {
liveCnt++ liveCnt++
continue continue
} }
@ -1091,16 +1217,141 @@ func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error {
deadCids = append(deadCids, c) deadCids = append(deadCids, c)
} }
err := s.hot.DeleteMany(s.ctx, deadCids) if len(deadCids) == 0 {
if err != nil { if err := checkpoint.Set(batch[len(batch)-1]); err != nil {
return xerrors.Errorf("error purging cold objects: %w", err) return 0, 0, xerrors.Errorf("error setting checkpoint: %w", err)
}
return 0, liveCnt, nil
}
if err := s.hot.DeleteMany(s.ctx, deadCids); err != nil {
return 0, liveCnt, xerrors.Errorf("error purging cold objects: %w", err)
} }
s.debug.LogDelete(deadCids) s.debug.LogDelete(deadCids)
purgeCnt = len(deadCids)
if err := checkpoint.Set(batch[len(batch)-1]); err != nil {
return purgeCnt, liveCnt, xerrors.Errorf("error setting checkpoint: %w", err)
}
return purgeCnt, liveCnt, nil
}
func (s *SplitStore) coldSetPath() string {
return filepath.Join(s.path, "coldset")
}
func (s *SplitStore) checkpointPath() string {
return filepath.Join(s.path, "checkpoint")
}
func (s *SplitStore) checkpointExists() bool {
_, err := os.Stat(s.checkpointPath())
return err == nil
}
func (s *SplitStore) completeCompaction() error {
checkpoint, last, err := OpenCheckpoint(s.checkpointPath())
if err != nil {
return xerrors.Errorf("error opening checkpoint: %w", err)
}
defer checkpoint.Close() //nolint:errcheck
coldr, err := NewColdSetReader(s.coldSetPath())
if err != nil {
return xerrors.Errorf("error opening coldset: %w", err)
}
defer coldr.Close() //nolint:errcheck
markSet, err := s.markSetEnv.Recover("live")
if err != nil {
return xerrors.Errorf("error recovering markset: %w", err)
}
defer markSet.Close() //nolint:errcheck
// PURGE
log.Info("purging cold objects from the hotstore")
startPurge := time.Now()
err = s.completePurge(coldr, checkpoint, last, markSet)
if err != nil {
return xerrors.Errorf("error purging cold objects: %w", err)
}
log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge))
markSet.EndCriticalSection()
if err := checkpoint.Close(); err != nil {
log.Warnf("error closing checkpoint: %s", err)
}
if err := os.Remove(s.checkpointPath()); err != nil {
log.Warnf("error removing checkpoint: %s", err)
}
if err := coldr.Close(); err != nil {
log.Warnf("error closing coldset: %s", err)
}
if err := os.Remove(s.coldSetPath()); err != nil {
log.Warnf("error removing coldset: %s", err)
}
// Note: at this point we can start the splitstore; a compaction should run on
// the first head change, which will trigger gc on the hotstore.
// We don't mind the second (back-to-back) compaction as the head will
// have advanced during marking and coldset accumulation.
return nil
}
func (s *SplitStore) completePurge(coldr *ColdSetReader, checkpoint *Checkpoint, start cid.Cid, markSet MarkSet) error {
if !start.Defined() {
return s.purge(coldr, checkpoint, markSet)
}
seeking := true
batch := make([]cid.Cid, 0, batchSize)
deadCids := make([]cid.Cid, 0, batchSize)
var purgeCnt, liveCnt int
defer func() {
log.Infow("purged cold objects", "purged", purgeCnt, "live", liveCnt)
}()
deleteBatch := func() error {
pc, lc, err := s.purgeBatch(batch, deadCids, checkpoint, markSet)
purgeCnt += pc
liveCnt += lc
batch = batch[:0]
return err
}
err := coldr.ForEach(func(c cid.Cid) error {
if seeking {
if start.Equals(c) {
seeking = false
}
return nil
}
batch = append(batch, c)
if len(batch) == batchSize {
return deleteBatch()
}
purgeCnt += len(deadCids)
return nil return nil
}) })
if err != nil {
return err
}
if len(batch) > 0 {
return deleteBatch()
}
return nil
} }
// I really don't like having this code, but we seem to have some occasional DAG references with // I really don't like having this code, but we seem to have some occasional DAG references with

View File

@ -4,6 +4,8 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"os"
"sync" "sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
@ -20,12 +22,14 @@ import (
datastore "github.com/ipfs/go-datastore" datastore "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync" dssync "github.com/ipfs/go-datastore/sync"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
mh "github.com/multiformats/go-multihash"
) )
func init() { func init() {
CompactionThreshold = 5 CompactionThreshold = 5
CompactionBoundary = 2 CompactionBoundary = 2
WarmupBoundary = 0 WarmupBoundary = 0
SyncWaitTime = time.Millisecond
logging.SetLogLevel("splitstore", "DEBUG") logging.SetLogLevel("splitstore", "DEBUG")
} }
@ -80,8 +84,17 @@ func testSplitStore(t *testing.T, cfg *Config) {
t.Fatal(err) t.Fatal(err)
} }
path, err := ioutil.TempDir("", "splitstore.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(path)
})
// open the splitstore // open the splitstore
ss, err := Open("", ds, hot, cold, cfg) ss, err := Open(path, ds, hot, cold, cfg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -125,6 +138,10 @@ func testSplitStore(t *testing.T, cfg *Config) {
} }
waitForCompaction := func() { waitForCompaction := func() {
ss.txnSyncMx.Lock()
ss.txnSync = true
ss.txnSyncCond.Broadcast()
ss.txnSyncMx.Unlock()
for atomic.LoadInt32(&ss.compacting) == 1 { for atomic.LoadInt32(&ss.compacting) == 1 {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
} }
@ -259,8 +276,17 @@ func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
path, err := ioutil.TempDir("", "splitstore.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(path)
})
// open the splitstore // open the splitstore
ss, err := Open("", ds, hot, cold, &Config{MarkSetType: "map"}) ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map"})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -305,6 +331,10 @@ func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) {
} }
waitForCompaction := func() { waitForCompaction := func() {
ss.txnSyncMx.Lock()
ss.txnSync = true
ss.txnSyncCond.Broadcast()
ss.txnSyncMx.Unlock()
for atomic.LoadInt32(&ss.compacting) == 1 { for atomic.LoadInt32(&ss.compacting) == 1 {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
} }
@ -426,17 +456,25 @@ func (c *mockChain) SubscribeHeadChanges(change func(revert []*types.TipSet, app
type mockStore struct { type mockStore struct {
mx sync.Mutex mx sync.Mutex
set map[cid.Cid]blocks.Block set map[string]blocks.Block
} }
func newMockStore() *mockStore { func newMockStore() *mockStore {
return &mockStore{set: make(map[cid.Cid]blocks.Block)} return &mockStore{set: make(map[string]blocks.Block)}
}
func (b *mockStore) keyOf(c cid.Cid) string {
return string(c.Hash())
}
func (b *mockStore) cidOf(k string) cid.Cid {
return cid.NewCidV1(cid.Raw, mh.Multihash([]byte(k)))
} }
func (b *mockStore) Has(_ context.Context, cid cid.Cid) (bool, error) { func (b *mockStore) Has(_ context.Context, cid cid.Cid) (bool, error) {
b.mx.Lock() b.mx.Lock()
defer b.mx.Unlock() defer b.mx.Unlock()
_, ok := b.set[cid] _, ok := b.set[b.keyOf(cid)]
return ok, nil return ok, nil
} }
@ -446,7 +484,7 @@ func (b *mockStore) Get(_ context.Context, cid cid.Cid) (blocks.Block, error) {
b.mx.Lock() b.mx.Lock()
defer b.mx.Unlock() defer b.mx.Unlock()
blk, ok := b.set[cid] blk, ok := b.set[b.keyOf(cid)]
if !ok { if !ok {
return nil, blockstore.ErrNotFound return nil, blockstore.ErrNotFound
} }
@ -474,7 +512,7 @@ func (b *mockStore) Put(_ context.Context, blk blocks.Block) error {
b.mx.Lock() b.mx.Lock()
defer b.mx.Unlock() defer b.mx.Unlock()
b.set[blk.Cid()] = blk b.set[b.keyOf(blk.Cid())] = blk
return nil return nil
} }
@ -483,7 +521,7 @@ func (b *mockStore) PutMany(_ context.Context, blks []blocks.Block) error {
defer b.mx.Unlock() defer b.mx.Unlock()
for _, blk := range blks { for _, blk := range blks {
b.set[blk.Cid()] = blk b.set[b.keyOf(blk.Cid())] = blk
} }
return nil return nil
} }
@ -492,7 +530,7 @@ func (b *mockStore) DeleteBlock(_ context.Context, cid cid.Cid) error {
b.mx.Lock() b.mx.Lock()
defer b.mx.Unlock() defer b.mx.Unlock()
delete(b.set, cid) delete(b.set, b.keyOf(cid))
return nil return nil
} }
@ -501,7 +539,7 @@ func (b *mockStore) DeleteMany(_ context.Context, cids []cid.Cid) error {
defer b.mx.Unlock() defer b.mx.Unlock()
for _, c := range cids { for _, c := range cids {
delete(b.set, c) delete(b.set, b.keyOf(c))
} }
return nil return nil
} }
@ -515,7 +553,7 @@ func (b *mockStore) ForEachKey(f func(cid.Cid) error) error {
defer b.mx.Unlock() defer b.mx.Unlock()
for c := range b.set { for c := range b.set {
err := f(c) err := f(b.cidOf(c))
if err != nil { if err != nil {
return err return err
} }

View File

@ -62,7 +62,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
xcount := new(int64) xcount := new(int64)
missing := new(int64) missing := new(int64)
visitor, err := s.markSetEnv.Create("warmup", 0) visitor, err := s.markSetEnv.New("warmup", 0)
if err != nil { if err != nil {
return xerrors.Errorf("error creating visitor: %w", err) return xerrors.Errorf("error creating visitor: %w", err)
} }

View File

@ -163,11 +163,11 @@
#HotStoreType = "badger" #HotStoreType = "badger"
# MarkSetType specifies the type of the markset. # MarkSetType specifies the type of the markset.
# It can be "map" (default) for in memory marking or "badger" for on-disk marking. # It can be "map" for in memory marking or "badger" (default) for on-disk marking.
# #
# type: string # type: string
# env var: LOTUS_CHAINSTORE_SPLITSTORE_MARKSETTYPE # env var: LOTUS_CHAINSTORE_SPLITSTORE_MARKSETTYPE
#MarkSetType = "map" #MarkSetType = "badger"
# HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond # HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond
# the compaction boundary; default is 0. # the compaction boundary; default is 0.

View File

@ -85,7 +85,7 @@ func DefaultFullNode() *FullNode {
Splitstore: Splitstore{ Splitstore: Splitstore{
ColdStoreType: "universal", ColdStoreType: "universal",
HotStoreType: "badger", HotStoreType: "badger",
MarkSetType: "map", MarkSetType: "badger",
HotStoreFullGCFrequency: 20, HotStoreFullGCFrequency: 20,
}, },

View File

@ -810,7 +810,7 @@ Only currently supported value is "badger".`,
Type: "string", Type: "string",
Comment: `MarkSetType specifies the type of the markset. Comment: `MarkSetType specifies the type of the markset.
It can be "map" (default) for in memory marking or "badger" for on-disk marking.`, It can be "map" for in memory marking or "badger" (default) for on-disk marking.`,
}, },
{ {
Name: "HotStoreMessageRetention", Name: "HotStoreMessageRetention",

View File

@ -363,7 +363,7 @@ type Splitstore struct {
// Only currently supported value is "badger". // Only currently supported value is "badger".
HotStoreType string HotStoreType string
// MarkSetType specifies the type of the markset. // MarkSetType specifies the type of the markset.
// It can be "map" (default) for in memory marking or "badger" for on-disk marking. // It can be "map" for in memory marking or "badger" (default) for on-disk marking.
MarkSetType string MarkSetType string
// HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond // HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond