make badger Close-safe

This commit is contained in:
vyzo 2021-07-09 09:54:12 +03:00
parent abdf4a161a
commit 909f7039d4

View File

@ -5,7 +5,7 @@ import (
"fmt" "fmt"
"io" "io"
"runtime" "runtime"
"sync/atomic" "sync"
"github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options" "github.com/dgraph-io/badger/v2/options"
@ -73,20 +73,16 @@ func (b *badgerLogger) Warningf(format string, args ...interface{}) {
} }
const ( const (
stateOpen int64 = iota stateOpen = iota
stateClosing stateClosing
stateClosed stateClosed
) )
// Blockstore is a badger-backed IPLD blockstore. // Blockstore is a badger-backed IPLD blockstore.
//
// NOTE: once Close() is called, methods will try their best to return
// ErrBlockstoreClosed. This will guaranteed to happen for all subsequent
// operation calls after Close() has returned, but it may not happen for
// operations in progress. Those are likely to fail with a different error.
type Blockstore struct { type Blockstore struct {
// state is accessed atomically stateLk sync.RWMutex
state int64 state int
viewers sync.WaitGroup
DB *badger.DB DB *badger.DB
@ -125,19 +121,51 @@ func Open(opts Options) (*Blockstore, error) {
// Close closes the store. If the store has already been closed, this noops and // Close closes the store. If the store has already been closed, this noops and
// returns an error, even if the first closure resulted in error. // returns an error, even if the first closure resulted in error.
func (b *Blockstore) Close() error { func (b *Blockstore) Close() error {
if !atomic.CompareAndSwapInt64(&b.state, stateOpen, stateClosing) { b.stateLk.Lock()
if b.state != stateOpen {
b.stateLk.Unlock()
return nil return nil
} }
b.state = stateClosing
b.stateLk.Unlock()
defer func() {
b.stateLk.Lock()
b.state = stateClosed
b.stateLk.Unlock()
}()
// wait for all accesses to complete
b.viewers.Wait()
defer atomic.StoreInt64(&b.state, stateClosed)
return b.DB.Close() return b.DB.Close()
} }
func (b *Blockstore) access() error {
b.stateLk.RLock()
defer b.stateLk.RUnlock()
if b.state != stateOpen {
return ErrBlockstoreClosed
}
b.viewers.Add(1)
return nil
}
func (b *Blockstore) isOpen() bool {
b.stateLk.RLock()
defer b.stateLk.RUnlock()
return b.state == stateOpen
}
// CollectGarbage runs garbage collection on the value log // CollectGarbage runs garbage collection on the value log
func (b *Blockstore) CollectGarbage() error { func (b *Blockstore) CollectGarbage() error {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return ErrBlockstoreClosed return err
} }
defer b.viewers.Done()
var err error var err error
for err == nil { for err == nil {
@ -154,9 +182,10 @@ func (b *Blockstore) CollectGarbage() error {
// Compact runs a synchronous compaction // Compact runs a synchronous compaction
func (b *Blockstore) Compact() error { func (b *Blockstore) Compact() error {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return ErrBlockstoreClosed return err
} }
defer b.viewers.Done()
nworkers := runtime.NumCPU() / 2 nworkers := runtime.NumCPU() / 2
if nworkers < 2 { if nworkers < 2 {
@ -169,9 +198,10 @@ func (b *Blockstore) Compact() error {
// View implements blockstore.Viewer, which leverages zero-copy read-only // View implements blockstore.Viewer, which leverages zero-copy read-only
// access to values. // access to values.
func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error { func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return ErrBlockstoreClosed return err
} }
defer b.viewers.Done()
k, pooled := b.PooledStorageKey(cid) k, pooled := b.PooledStorageKey(cid)
if pooled { if pooled {
@ -192,9 +222,10 @@ func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error {
// Has implements Blockstore.Has. // Has implements Blockstore.Has.
func (b *Blockstore) Has(cid cid.Cid) (bool, error) { func (b *Blockstore) Has(cid cid.Cid) (bool, error) {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return false, ErrBlockstoreClosed return false, err
} }
defer b.viewers.Done()
k, pooled := b.PooledStorageKey(cid) k, pooled := b.PooledStorageKey(cid)
if pooled { if pooled {
@ -222,9 +253,10 @@ func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) {
return nil, blockstore.ErrNotFound return nil, blockstore.ErrNotFound
} }
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return nil, ErrBlockstoreClosed return nil, err
} }
defer b.viewers.Done()
k, pooled := b.PooledStorageKey(cid) k, pooled := b.PooledStorageKey(cid)
if pooled { if pooled {
@ -251,9 +283,10 @@ func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) {
// GetSize implements Blockstore.GetSize. // GetSize implements Blockstore.GetSize.
func (b *Blockstore) GetSize(cid cid.Cid) (int, error) { func (b *Blockstore) GetSize(cid cid.Cid) (int, error) {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return -1, ErrBlockstoreClosed return 0, err
} }
defer b.viewers.Done()
k, pooled := b.PooledStorageKey(cid) k, pooled := b.PooledStorageKey(cid)
if pooled { if pooled {
@ -280,9 +313,10 @@ func (b *Blockstore) GetSize(cid cid.Cid) (int, error) {
// Put implements Blockstore.Put. // Put implements Blockstore.Put.
func (b *Blockstore) Put(block blocks.Block) error { func (b *Blockstore) Put(block blocks.Block) error {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return ErrBlockstoreClosed return err
} }
defer b.viewers.Done()
k, pooled := b.PooledStorageKey(block.Cid()) k, pooled := b.PooledStorageKey(block.Cid())
if pooled { if pooled {
@ -300,9 +334,10 @@ func (b *Blockstore) Put(block blocks.Block) error {
// PutMany implements Blockstore.PutMany. // PutMany implements Blockstore.PutMany.
func (b *Blockstore) PutMany(blocks []blocks.Block) error { func (b *Blockstore) PutMany(blocks []blocks.Block) error {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return ErrBlockstoreClosed return err
} }
defer b.viewers.Done()
// toReturn tracks the byte slices to return to the pool, if we're using key // toReturn tracks the byte slices to return to the pool, if we're using key
// prefixing. we can't return each slice to the pool after each Set, because // prefixing. we can't return each slice to the pool after each Set, because
@ -339,9 +374,10 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error {
// DeleteBlock implements Blockstore.DeleteBlock. // DeleteBlock implements Blockstore.DeleteBlock.
func (b *Blockstore) DeleteBlock(cid cid.Cid) error { func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return ErrBlockstoreClosed return err
} }
defer b.viewers.Done()
k, pooled := b.PooledStorageKey(cid) k, pooled := b.PooledStorageKey(cid)
if pooled { if pooled {
@ -354,9 +390,10 @@ func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
} }
func (b *Blockstore) DeleteMany(cids []cid.Cid) error { func (b *Blockstore) DeleteMany(cids []cid.Cid) error {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return ErrBlockstoreClosed return err
} }
defer b.viewers.Done()
// toReturn tracks the byte slices to return to the pool, if we're using key // toReturn tracks the byte slices to return to the pool, if we're using key
// prefixing. we can't return each slice to the pool after each Set, because // prefixing. we can't return each slice to the pool after each Set, because
@ -393,8 +430,8 @@ func (b *Blockstore) DeleteMany(cids []cid.Cid) error {
// AllKeysChan implements Blockstore.AllKeysChan. // AllKeysChan implements Blockstore.AllKeysChan.
func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return nil, ErrBlockstoreClosed return nil, err
} }
txn := b.DB.NewTransaction(false) txn := b.DB.NewTransaction(false)
@ -406,6 +443,7 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
ch := make(chan cid.Cid) ch := make(chan cid.Cid)
go func() { go func() {
defer b.viewers.Done()
defer close(ch) defer close(ch)
defer iter.Close() defer iter.Close()
@ -416,7 +454,7 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
if ctx.Err() != nil { if ctx.Err() != nil {
return // context has fired. return // context has fired.
} }
if atomic.LoadInt64(&b.state) != stateOpen { if !b.isOpen() {
// open iterators will run even after the database is closed... // open iterators will run even after the database is closed...
return // closing, yield. return // closing, yield.
} }
@ -445,9 +483,10 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
// Implementation of BlockstoreIterator interface // Implementation of BlockstoreIterator interface
func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error { func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return ErrBlockstoreClosed return err
} }
defer b.viewers.Done()
txn := b.DB.NewTransaction(false) txn := b.DB.NewTransaction(false)
defer txn.Discard() defer txn.Discard()
@ -462,7 +501,7 @@ func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error {
var buf []byte var buf []byte
for iter.Rewind(); iter.Valid(); iter.Next() { for iter.Rewind(); iter.Valid(); iter.Next() {
if atomic.LoadInt64(&b.state) != stateOpen { if !b.isOpen() {
return ErrBlockstoreClosed return ErrBlockstoreClosed
} }