202 lines
4.9 KiB
Go
202 lines
4.9 KiB
Go
package blockstore
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
ipld "github.com/ipfs/go-ipld-format"
|
|
blocks "github.com/ipfs/go-libipfs/blocks"
|
|
"github.com/raulk/clock"
|
|
"go.uber.org/multierr"
|
|
)
|
|
|
|
// TimedCacheBlockstore is a blockstore that keeps blocks for at least the
|
|
// specified caching interval before discarding them. Garbage collection must
|
|
// be started and stopped by calling Start/Stop.
|
|
//
|
|
// Under the covers, it's implemented with an active and an inactive blockstore
|
|
// that are rotated every cache time interval. This means all blocks will be
|
|
// stored at most 2x the cache interval.
|
|
//
|
|
// Create a new instance by calling the NewTimedCacheBlockstore constructor.
|
|
type TimedCacheBlockstore struct {
|
|
mu sync.RWMutex
|
|
active, inactive MemBlockstore
|
|
clock clock.Clock
|
|
interval time.Duration
|
|
closeCh chan struct{}
|
|
doneRotatingCh chan struct{}
|
|
}
|
|
|
|
func NewTimedCacheBlockstore(interval time.Duration) *TimedCacheBlockstore {
|
|
b := &TimedCacheBlockstore{
|
|
active: NewMemory(),
|
|
inactive: NewMemory(),
|
|
interval: interval,
|
|
clock: clock.New(),
|
|
}
|
|
return b
|
|
}
|
|
|
|
func (t *TimedCacheBlockstore) Start(_ context.Context) error {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
if t.closeCh != nil {
|
|
return fmt.Errorf("already started")
|
|
}
|
|
t.closeCh = make(chan struct{})
|
|
|
|
// Create this timer before starting the goroutine. Otherwise, creating the timer will race
|
|
// with adding time to the mock clock, and we could add time _first_, then stall waiting for
|
|
// a timer that'll never fire.
|
|
ticker := t.clock.Ticker(t.interval)
|
|
go func() {
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
t.rotate()
|
|
if t.doneRotatingCh != nil {
|
|
t.doneRotatingCh <- struct{}{}
|
|
}
|
|
case <-t.closeCh:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (t *TimedCacheBlockstore) Stop(_ context.Context) error {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
if t.closeCh == nil {
|
|
return fmt.Errorf("not started")
|
|
}
|
|
select {
|
|
case <-t.closeCh:
|
|
// already closed
|
|
default:
|
|
close(t.closeCh)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *TimedCacheBlockstore) rotate() {
|
|
newBs := NewMemory()
|
|
|
|
t.mu.Lock()
|
|
t.inactive, t.active = t.active, newBs
|
|
t.mu.Unlock()
|
|
}
|
|
|
|
func (t *TimedCacheBlockstore) Flush(ctx context.Context) error {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
if err := t.active.Flush(ctx); err != nil {
|
|
return err
|
|
}
|
|
return t.inactive.Flush(ctx)
|
|
}
|
|
|
|
func (t *TimedCacheBlockstore) Put(ctx context.Context, b blocks.Block) error {
|
|
// Don't check the inactive set here. We want to keep this block for at
|
|
// least one interval.
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
return t.active.Put(ctx, b)
|
|
}
|
|
|
|
func (t *TimedCacheBlockstore) PutMany(ctx context.Context, bs []blocks.Block) error {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
return t.active.PutMany(ctx, bs)
|
|
}
|
|
|
|
func (t *TimedCacheBlockstore) View(ctx context.Context, k cid.Cid, callback func([]byte) error) error {
|
|
// The underlying blockstore is always a "mem" blockstore so there's no difference,
|
|
// from a performance perspective, between view & get. So we call Get to avoid
|
|
// calling an arbitrary callback while holding a lock.
|
|
t.mu.RLock()
|
|
block, err := t.active.Get(ctx, k)
|
|
if ipld.IsNotFound(err) {
|
|
block, err = t.inactive.Get(ctx, k)
|
|
}
|
|
t.mu.RUnlock()
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return callback(block.RawData())
|
|
}
|
|
|
|
func (t *TimedCacheBlockstore) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
b, err := t.active.Get(ctx, k)
|
|
if ipld.IsNotFound(err) {
|
|
b, err = t.inactive.Get(ctx, k)
|
|
}
|
|
return b, err
|
|
}
|
|
|
|
func (t *TimedCacheBlockstore) GetSize(ctx context.Context, k cid.Cid) (int, error) {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
size, err := t.active.GetSize(ctx, k)
|
|
if ipld.IsNotFound(err) {
|
|
size, err = t.inactive.GetSize(ctx, k)
|
|
}
|
|
return size, err
|
|
}
|
|
|
|
func (t *TimedCacheBlockstore) Has(ctx context.Context, k cid.Cid) (bool, error) {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
if has, err := t.active.Has(ctx, k); err != nil {
|
|
return false, err
|
|
} else if has {
|
|
return true, nil
|
|
}
|
|
return t.inactive.Has(ctx, k)
|
|
}
|
|
|
|
func (t *TimedCacheBlockstore) HashOnRead(_ bool) {
|
|
// no-op
|
|
}
|
|
|
|
func (t *TimedCacheBlockstore) DeleteBlock(ctx context.Context, k cid.Cid) error {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
return multierr.Combine(t.active.DeleteBlock(ctx, k), t.inactive.DeleteBlock(ctx, k))
|
|
}
|
|
|
|
func (t *TimedCacheBlockstore) DeleteMany(ctx context.Context, ks []cid.Cid) error {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
return multierr.Combine(t.active.DeleteMany(ctx, ks), t.inactive.DeleteMany(ctx, ks))
|
|
}
|
|
|
|
func (t *TimedCacheBlockstore) AllKeysChan(_ context.Context) (<-chan cid.Cid, error) {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
|
|
ch := make(chan cid.Cid, len(t.active)+len(t.inactive))
|
|
for _, b := range t.active {
|
|
ch <- b.Cid()
|
|
}
|
|
for _, b := range t.inactive {
|
|
c := b.Cid()
|
|
if _, ok := t.active[string(c.Hash())]; ok {
|
|
continue
|
|
}
|
|
ch <- c
|
|
}
|
|
close(ch)
|
|
return ch, nil
|
|
}
|