Merge pull request #10465 from ribasushi/flush_bs

chore: blockstore: Plumb through a proper Flush() method on all blockstores
This commit is contained in:
Łukasz Magiera 2023-03-15 10:16:01 +01:00 committed by GitHub
commit a7c9a83091
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 82 additions and 0 deletions

View File

@ -601,6 +601,18 @@ func (b *Blockstore) View(ctx context.Context, cid cid.Cid, fn func([]byte) erro
}) })
} }
func (b *Blockstore) Flush(context.Context) error {
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
return b.db.Sync()
}
// Has implements Blockstore.Has. // Has implements Blockstore.Has.
func (b *Blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) { func (b *Blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
if err := b.access(); err != nil { if err := b.access(); err != nil {

View File

@ -18,6 +18,7 @@ type Blockstore interface {
blockstore.Blockstore blockstore.Blockstore
blockstore.Viewer blockstore.Viewer
BatchDeleter BatchDeleter
Flusher
} }
// BasicBlockstore is an alias to the original IPFS Blockstore. // BasicBlockstore is an alias to the original IPFS Blockstore.
@ -25,6 +26,10 @@ type BasicBlockstore = blockstore.Blockstore
type Viewer = blockstore.Viewer type Viewer = blockstore.Viewer
type Flusher interface {
Flush(context.Context) error
}
type BatchDeleter interface { type BatchDeleter interface {
DeleteMany(ctx context.Context, cids []cid.Cid) error DeleteMany(ctx context.Context, cids []cid.Cid) error
} }
@ -106,6 +111,13 @@ type adaptedBlockstore struct {
var _ Blockstore = (*adaptedBlockstore)(nil) var _ Blockstore = (*adaptedBlockstore)(nil)
func (a *adaptedBlockstore) Flush(ctx context.Context) error {
if flusher, canFlush := a.Blockstore.(Flusher); canFlush {
return flusher.Flush(ctx)
}
return nil
}
func (a *adaptedBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error { func (a *adaptedBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error {
blk, err := a.Get(ctx, cid) blk, err := a.Get(ctx, cid)
if err != nil { if err != nil {

View File

@ -46,6 +46,8 @@ var (
_ Viewer = (*BufferedBlockstore)(nil) _ Viewer = (*BufferedBlockstore)(nil)
) )
func (bs *BufferedBlockstore) Flush(ctx context.Context) error { return bs.write.Flush(ctx) }
func (bs *BufferedBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { func (bs *BufferedBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
a, err := bs.read.AllKeysChan(ctx) a, err := bs.read.AllKeysChan(ctx)
if err != nil { if err != nil {

View File

@ -38,6 +38,10 @@ func (b *discardstore) View(ctx context.Context, cid cid.Cid, f func([]byte) err
return b.bs.View(ctx, cid, f) return b.bs.View(ctx, cid, f)
} }
func (b *discardstore) Flush(ctx context.Context) error {
return nil
}
func (b *discardstore) Put(ctx context.Context, blk blocks.Block) error { func (b *discardstore) Put(ctx context.Context, blk blocks.Block) error {
return nil return nil
} }

View File

@ -179,3 +179,7 @@ func (b *idstore) Close() error {
} }
return nil return nil
} }
func (b *idstore) Flush(ctx context.Context) error {
return b.bs.Flush(ctx)
}

View File

@ -17,6 +17,8 @@ func NewMemory() MemBlockstore {
// To match behavior of badger blockstore we index by multihash only. // To match behavior of badger blockstore we index by multihash only.
type MemBlockstore map[string]blocks.Block type MemBlockstore map[string]blocks.Block
func (MemBlockstore) Flush(context.Context) error { return nil }
func (m MemBlockstore) DeleteBlock(ctx context.Context, k cid.Cid) error { func (m MemBlockstore) DeleteBlock(ctx context.Context, k cid.Cid) error {
delete(m, string(k.Hash())) delete(m, string(k.Hash()))
return nil return nil

View File

@ -410,6 +410,8 @@ func (n *NetworkStore) HashOnRead(enabled bool) {
return return
} }
func (*NetworkStore) Flush(context.Context) error { return nil }
func (n *NetworkStore) Stop(ctx context.Context) error { func (n *NetworkStore) Stop(ctx context.Context) error {
close(n.closing) close(n.closing)

View File

@ -476,6 +476,23 @@ func (s *SplitStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
} }
} }
func (s *SplitStore) Flush(ctx context.Context) error {
s.txnLk.RLock()
defer s.txnLk.RUnlock()
if err := s.cold.Flush(ctx); err != nil {
return err
}
if err := s.hot.Flush(ctx); err != nil {
return err
}
if err := s.ds.Sync(ctx, dstore.Key{}); err != nil {
return err
}
return nil
}
func (s *SplitStore) Put(ctx context.Context, blk blocks.Block) error { func (s *SplitStore) Put(ctx context.Context, blk blocks.Block) error {
if isIdentiyCid(blk.Cid()) { if isIdentiyCid(blk.Cid()) {
return nil return nil

View File

@ -77,6 +77,10 @@ func (es *exposedSplitStore) GetSize(ctx context.Context, c cid.Cid) (int, error
return size, err return size, err
} }
func (es *exposedSplitStore) Flush(ctx context.Context) error {
return es.s.Flush(ctx)
}
func (es *exposedSplitStore) Put(ctx context.Context, blk blocks.Block) error { func (es *exposedSplitStore) Put(ctx context.Context, blk blocks.Block) error {
return es.s.Put(ctx, blk) return es.s.Put(ctx, blk)
} }

View File

@ -757,6 +757,8 @@ func (b *mockStore) DeleteMany(_ context.Context, cids []cid.Cid) error {
return nil return nil
} }
func (b *mockStore) Flush(context.Context) error { return nil }
func (b *mockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { func (b *mockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errors.New("not implemented") return nil, errors.New("not implemented")
} }

View File

@ -20,6 +20,8 @@ type SyncBlockstore struct {
bs MemBlockstore // specifically use a memStore to save indirection overhead. bs MemBlockstore // specifically use a memStore to save indirection overhead.
} }
func (*SyncBlockstore) Flush(context.Context) error { return nil }
func (m *SyncBlockstore) DeleteBlock(ctx context.Context, k cid.Cid) error { func (m *SyncBlockstore) DeleteBlock(ctx context.Context, k cid.Cid) error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()

View File

@ -93,6 +93,16 @@ func (t *TimedCacheBlockstore) rotate() {
t.mu.Unlock() t.mu.Unlock()
} }
func (t *TimedCacheBlockstore) Flush(ctx context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()
if err := t.active.Flush(ctx); err != nil {
return err
}
return t.inactive.Flush(ctx)
}
func (t *TimedCacheBlockstore) Put(ctx context.Context, b blocks.Block) error { func (t *TimedCacheBlockstore) Put(ctx context.Context, b blocks.Block) error {
// Don't check the inactive set here. We want to keep this block for at // Don't check the inactive set here. We want to keep this block for at
// least one interval. // least one interval.

View File

@ -55,6 +55,15 @@ func (m unionBlockstore) GetSize(ctx context.Context, cid cid.Cid) (size int, er
return size, err return size, err
} }
func (m unionBlockstore) Flush(ctx context.Context) (err error) {
for _, bs := range m {
if err = bs.Flush(ctx); err != nil {
break
}
}
return err
}
func (m unionBlockstore) Put(ctx context.Context, block blocks.Block) (err error) { func (m unionBlockstore) Put(ctx context.Context, block blocks.Block) (err error) {
for _, bs := range m { for _, bs := range m {
if err = bs.Put(ctx, block); err != nil { if err = bs.Put(ctx, block); err != nil {