diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 3087c01f7..1b52eb548 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -601,6 +601,18 @@ func (b *Blockstore) View(ctx context.Context, cid cid.Cid, fn func([]byte) erro }) } +func (b *Blockstore) Flush(context.Context) error { + if err := b.access(); err != nil { + return err + } + defer b.viewers.Done() + + b.lockDB() + defer b.unlockDB() + + return b.db.Sync() +} + // Has implements Blockstore.Has. func (b *Blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) { if err := b.access(); err != nil { diff --git a/blockstore/blockstore.go b/blockstore/blockstore.go index be1f1199f..8d16bbd50 100644 --- a/blockstore/blockstore.go +++ b/blockstore/blockstore.go @@ -18,6 +18,7 @@ type Blockstore interface { blockstore.Blockstore blockstore.Viewer BatchDeleter + Flusher } // BasicBlockstore is an alias to the original IPFS Blockstore. @@ -25,6 +26,10 @@ type BasicBlockstore = blockstore.Blockstore type Viewer = blockstore.Viewer +type Flusher interface { + Flush(context.Context) error +} + type BatchDeleter interface { DeleteMany(ctx context.Context, cids []cid.Cid) error } @@ -106,6 +111,13 @@ type adaptedBlockstore struct { var _ Blockstore = (*adaptedBlockstore)(nil) +func (a *adaptedBlockstore) Flush(ctx context.Context) error { + if flusher, canFlush := a.Blockstore.(Flusher); canFlush { + return flusher.Flush(ctx) + } + return nil +} + func (a *adaptedBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error { blk, err := a.Get(ctx, cid) if err != nil { diff --git a/blockstore/buffered.go b/blockstore/buffered.go index e089ed561..bc37b2f69 100644 --- a/blockstore/buffered.go +++ b/blockstore/buffered.go @@ -46,6 +46,8 @@ var ( _ Viewer = (*BufferedBlockstore)(nil) ) +func (bs *BufferedBlockstore) Flush(ctx context.Context) error { return bs.write.Flush(ctx) } + func (bs *BufferedBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { a, err := bs.read.AllKeysChan(ctx) if err != nil { diff --git a/blockstore/discard.go b/blockstore/discard.go index 7f1a76a22..878797561 100644 --- a/blockstore/discard.go +++ b/blockstore/discard.go @@ -38,6 +38,10 @@ func (b *discardstore) View(ctx context.Context, cid cid.Cid, f func([]byte) err return b.bs.View(ctx, cid, f) } +func (b *discardstore) Flush(ctx context.Context) error { + return nil +} + func (b *discardstore) Put(ctx context.Context, blk blocks.Block) error { return nil } diff --git a/blockstore/idstore.go b/blockstore/idstore.go index a0ecec5f0..ae807076d 100644 --- a/blockstore/idstore.go +++ b/blockstore/idstore.go @@ -179,3 +179,7 @@ func (b *idstore) Close() error { } return nil } + +func (b *idstore) Flush(ctx context.Context) error { + return b.bs.Flush(ctx) +} diff --git a/blockstore/mem.go b/blockstore/mem.go index 05da287c5..5b06634de 100644 --- a/blockstore/mem.go +++ b/blockstore/mem.go @@ -17,6 +17,8 @@ func NewMemory() MemBlockstore { // To match behavior of badger blockstore we index by multihash only. type MemBlockstore map[string]blocks.Block +func (MemBlockstore) Flush(context.Context) error { return nil } + func (m MemBlockstore) DeleteBlock(ctx context.Context, k cid.Cid) error { delete(m, string(k.Hash())) return nil diff --git a/blockstore/net.go b/blockstore/net.go index a6b008af2..62aceed71 100644 --- a/blockstore/net.go +++ b/blockstore/net.go @@ -410,6 +410,8 @@ func (n *NetworkStore) HashOnRead(enabled bool) { return } +func (*NetworkStore) Flush(context.Context) error { return nil } + func (n *NetworkStore) Stop(ctx context.Context) error { close(n.closing) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 410cc50df..bd9efb630 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -476,6 +476,23 @@ func (s *SplitStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) { } } +func (s *SplitStore) Flush(ctx context.Context) error { + s.txnLk.RLock() + defer s.txnLk.RUnlock() + + if err := s.cold.Flush(ctx); err != nil { + return err + } + if err := s.hot.Flush(ctx); err != nil { + return err + } + if err := s.ds.Sync(ctx, dstore.Key{}); err != nil { + return err + } + + return nil +} + func (s *SplitStore) Put(ctx context.Context, blk blocks.Block) error { if isIdentiyCid(blk.Cid()) { return nil diff --git a/blockstore/splitstore/splitstore_expose.go b/blockstore/splitstore/splitstore_expose.go index d092fbb9b..7461e338d 100644 --- a/blockstore/splitstore/splitstore_expose.go +++ b/blockstore/splitstore/splitstore_expose.go @@ -77,6 +77,10 @@ func (es *exposedSplitStore) GetSize(ctx context.Context, c cid.Cid) (int, error return size, err } +func (es *exposedSplitStore) Flush(ctx context.Context) error { + return es.s.Flush(ctx) +} + func (es *exposedSplitStore) Put(ctx context.Context, blk blocks.Block) error { return es.s.Put(ctx, blk) } diff --git a/blockstore/splitstore/splitstore_test.go b/blockstore/splitstore/splitstore_test.go index c97a9d01c..68e1bfb65 100644 --- a/blockstore/splitstore/splitstore_test.go +++ b/blockstore/splitstore/splitstore_test.go @@ -757,6 +757,8 @@ func (b *mockStore) DeleteMany(_ context.Context, cids []cid.Cid) error { return nil } +func (b *mockStore) Flush(context.Context) error { return nil } + func (b *mockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { return nil, errors.New("not implemented") } diff --git a/blockstore/sync.go b/blockstore/sync.go index 4f97027ae..652943dca 100644 --- a/blockstore/sync.go +++ b/blockstore/sync.go @@ -20,6 +20,8 @@ type SyncBlockstore struct { bs MemBlockstore // specifically use a memStore to save indirection overhead. } +func (*SyncBlockstore) Flush(context.Context) error { return nil } + func (m *SyncBlockstore) DeleteBlock(ctx context.Context, k cid.Cid) error { m.mu.Lock() defer m.mu.Unlock() diff --git a/blockstore/timed.go b/blockstore/timed.go index dda2e1958..3deabb7b8 100644 --- a/blockstore/timed.go +++ b/blockstore/timed.go @@ -93,6 +93,16 @@ func (t *TimedCacheBlockstore) rotate() { t.mu.Unlock() } +func (t *TimedCacheBlockstore) Flush(ctx context.Context) error { + t.mu.Lock() + defer t.mu.Unlock() + + if err := t.active.Flush(ctx); err != nil { + return err + } + return t.inactive.Flush(ctx) +} + func (t *TimedCacheBlockstore) Put(ctx context.Context, b blocks.Block) error { // Don't check the inactive set here. We want to keep this block for at // least one interval. diff --git a/blockstore/union.go b/blockstore/union.go index 3372cd20c..ae6f81955 100644 --- a/blockstore/union.go +++ b/blockstore/union.go @@ -55,6 +55,15 @@ func (m unionBlockstore) GetSize(ctx context.Context, cid cid.Cid) (size int, er return size, err } +func (m unionBlockstore) Flush(ctx context.Context) (err error) { + for _, bs := range m { + if err = bs.Flush(ctx); err != nil { + break + } + } + return err +} + func (m unionBlockstore) Put(ctx context.Context, block blocks.Block) (err error) { for _, bs := range m { if err = bs.Put(ctx, block); err != nil {