From 909f7039d47c351c2581de0c39bb1c9b5d8a721d Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 9 Jul 2021 09:54:12 +0300 Subject: [PATCH] make badger Close-safe --- blockstore/badger/blockstore.go | 113 +++++++++++++++++++++----------- 1 file changed, 76 insertions(+), 37 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index f77691a6f..17ebbd7ea 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -5,7 +5,7 @@ import ( "fmt" "io" "runtime" - "sync/atomic" + "sync" "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2/options" @@ -73,20 +73,16 @@ func (b *badgerLogger) Warningf(format string, args ...interface{}) { } const ( - stateOpen int64 = iota + stateOpen = iota stateClosing stateClosed ) // Blockstore is a badger-backed IPLD blockstore. -// -// NOTE: once Close() is called, methods will try their best to return -// ErrBlockstoreClosed. This will guaranteed to happen for all subsequent -// operation calls after Close() has returned, but it may not happen for -// operations in progress. Those are likely to fail with a different error. type Blockstore struct { - // state is accessed atomically - state int64 + stateLk sync.RWMutex + state int + viewers sync.WaitGroup DB *badger.DB @@ -125,19 +121,51 @@ func Open(opts Options) (*Blockstore, error) { // Close closes the store. If the store has already been closed, this noops and // returns an error, even if the first closure resulted in error. func (b *Blockstore) Close() error { - if !atomic.CompareAndSwapInt64(&b.state, stateOpen, stateClosing) { + b.stateLk.Lock() + if b.state != stateOpen { + b.stateLk.Unlock() return nil } + b.state = stateClosing + b.stateLk.Unlock() + + defer func() { + b.stateLk.Lock() + b.state = stateClosed + b.stateLk.Unlock() + }() + + // wait for all accesses to complete + b.viewers.Wait() - defer atomic.StoreInt64(&b.state, stateClosed) return b.DB.Close() } +func (b *Blockstore) access() error { + b.stateLk.RLock() + defer b.stateLk.RUnlock() + + if b.state != stateOpen { + return ErrBlockstoreClosed + } + + b.viewers.Add(1) + return nil +} + +func (b *Blockstore) isOpen() bool { + b.stateLk.RLock() + defer b.stateLk.RUnlock() + + return b.state == stateOpen +} + // CollectGarbage runs garbage collection on the value log func (b *Blockstore) CollectGarbage() error { - if atomic.LoadInt64(&b.state) != stateOpen { - return ErrBlockstoreClosed + if err := b.access(); err != nil { + return err } + defer b.viewers.Done() var err error for err == nil { @@ -154,9 +182,10 @@ func (b *Blockstore) CollectGarbage() error { // Compact runs a synchronous compaction func (b *Blockstore) Compact() error { - if atomic.LoadInt64(&b.state) != stateOpen { - return ErrBlockstoreClosed + if err := b.access(); err != nil { + return err } + defer b.viewers.Done() nworkers := runtime.NumCPU() / 2 if nworkers < 2 { @@ -169,9 +198,10 @@ func (b *Blockstore) Compact() error { // View implements blockstore.Viewer, which leverages zero-copy read-only // access to values. func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error { - if atomic.LoadInt64(&b.state) != stateOpen { - return ErrBlockstoreClosed + if err := b.access(); err != nil { + return err } + defer b.viewers.Done() k, pooled := b.PooledStorageKey(cid) if pooled { @@ -192,9 +222,10 @@ func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error { // Has implements Blockstore.Has. func (b *Blockstore) Has(cid cid.Cid) (bool, error) { - if atomic.LoadInt64(&b.state) != stateOpen { - return false, ErrBlockstoreClosed + if err := b.access(); err != nil { + return false, err } + defer b.viewers.Done() k, pooled := b.PooledStorageKey(cid) if pooled { @@ -222,9 +253,10 @@ func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) { return nil, blockstore.ErrNotFound } - if atomic.LoadInt64(&b.state) != stateOpen { - return nil, ErrBlockstoreClosed + if err := b.access(); err != nil { + return nil, err } + defer b.viewers.Done() k, pooled := b.PooledStorageKey(cid) if pooled { @@ -251,9 +283,10 @@ func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) { // GetSize implements Blockstore.GetSize. func (b *Blockstore) GetSize(cid cid.Cid) (int, error) { - if atomic.LoadInt64(&b.state) != stateOpen { - return -1, ErrBlockstoreClosed + if err := b.access(); err != nil { + return 0, err } + defer b.viewers.Done() k, pooled := b.PooledStorageKey(cid) if pooled { @@ -280,9 +313,10 @@ func (b *Blockstore) GetSize(cid cid.Cid) (int, error) { // Put implements Blockstore.Put. func (b *Blockstore) Put(block blocks.Block) error { - if atomic.LoadInt64(&b.state) != stateOpen { - return ErrBlockstoreClosed + if err := b.access(); err != nil { + return err } + defer b.viewers.Done() k, pooled := b.PooledStorageKey(block.Cid()) if pooled { @@ -300,9 +334,10 @@ func (b *Blockstore) Put(block blocks.Block) error { // PutMany implements Blockstore.PutMany. func (b *Blockstore) PutMany(blocks []blocks.Block) error { - if atomic.LoadInt64(&b.state) != stateOpen { - return ErrBlockstoreClosed + if err := b.access(); err != nil { + return err } + defer b.viewers.Done() // toReturn tracks the byte slices to return to the pool, if we're using key // prefixing. we can't return each slice to the pool after each Set, because @@ -339,9 +374,10 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error { // DeleteBlock implements Blockstore.DeleteBlock. func (b *Blockstore) DeleteBlock(cid cid.Cid) error { - if atomic.LoadInt64(&b.state) != stateOpen { - return ErrBlockstoreClosed + if err := b.access(); err != nil { + return err } + defer b.viewers.Done() k, pooled := b.PooledStorageKey(cid) if pooled { @@ -354,9 +390,10 @@ func (b *Blockstore) DeleteBlock(cid cid.Cid) error { } func (b *Blockstore) DeleteMany(cids []cid.Cid) error { - if atomic.LoadInt64(&b.state) != stateOpen { - return ErrBlockstoreClosed + if err := b.access(); err != nil { + return err } + defer b.viewers.Done() // toReturn tracks the byte slices to return to the pool, if we're using key // prefixing. we can't return each slice to the pool after each Set, because @@ -393,8 +430,8 @@ func (b *Blockstore) DeleteMany(cids []cid.Cid) error { // AllKeysChan implements Blockstore.AllKeysChan. func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - if atomic.LoadInt64(&b.state) != stateOpen { - return nil, ErrBlockstoreClosed + if err := b.access(); err != nil { + return nil, err } txn := b.DB.NewTransaction(false) @@ -406,6 +443,7 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { ch := make(chan cid.Cid) go func() { + defer b.viewers.Done() defer close(ch) defer iter.Close() @@ -416,7 +454,7 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { if ctx.Err() != nil { return // context has fired. } - if atomic.LoadInt64(&b.state) != stateOpen { + if !b.isOpen() { // open iterators will run even after the database is closed... return // closing, yield. } @@ -445,9 +483,10 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { // Implementation of BlockstoreIterator interface func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error { - if atomic.LoadInt64(&b.state) != stateOpen { - return ErrBlockstoreClosed + if err := b.access(); err != nil { + return err } + defer b.viewers.Done() txn := b.DB.NewTransaction(false) defer txn.Discard() @@ -462,7 +501,7 @@ func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error { var buf []byte for iter.Rewind(); iter.Valid(); iter.Next() { - if atomic.LoadInt64(&b.state) != stateOpen { + if !b.isOpen() { return ErrBlockstoreClosed }