3795cc2bd2
This paves the way for better object lifetime management. Concretely, it makes it possible to: - have different stores backing chain and state data. - having the same datastore library, but using different parameters. - attach different caching layers/policies to each class of data, e.g. sizing caches differently. - specifying different retention policies for chain and state data. This separation is important because: - access patterns/frequency of chain and state data are different. - state is derivable from chain, so one could never expunge the chain store, and only retain state objects reachable from the last finality in the state store.
107 lines
2.6 KiB
Go
107 lines
2.6 KiB
Go
package blockstore
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/xerrors"
|
|
|
|
blocks "github.com/ipfs/go-block-format"
|
|
"github.com/ipfs/go-cid"
|
|
)
|
|
|
|
// UnwrapFallbackStore takes a blockstore, and returns the underlying blockstore
|
|
// if it was a FallbackStore. Otherwise, it just returns the supplied store
|
|
// unmodified.
|
|
func UnwrapFallbackStore(bs Blockstore) (Blockstore, bool) {
|
|
if fbs, ok := bs.(*FallbackStore); ok {
|
|
return fbs.Blockstore, true
|
|
}
|
|
return bs, false
|
|
}
|
|
|
|
// FallbackStore is a read-through store that queries another (potentially
|
|
// remote) source if the block is not found locally. If the block is found
|
|
// during the fallback, it stores it in the local store.
|
|
type FallbackStore struct {
|
|
Blockstore
|
|
|
|
lk sync.RWMutex
|
|
// missFn is the function that will be invoked on a local miss to pull the
|
|
// block from elsewhere.
|
|
missFn func(context.Context, cid.Cid) (blocks.Block, error)
|
|
}
|
|
|
|
var _ Blockstore = (*FallbackStore)(nil)
|
|
|
|
func (fbs *FallbackStore) SetFallback(missFn func(context.Context, cid.Cid) (blocks.Block, error)) {
|
|
fbs.lk.Lock()
|
|
defer fbs.lk.Unlock()
|
|
|
|
fbs.missFn = missFn
|
|
}
|
|
|
|
func (fbs *FallbackStore) getFallback(c cid.Cid) (blocks.Block, error) {
|
|
log.Warnf("fallbackstore: block not found locally, fetching from the network; cid: %s", c)
|
|
fbs.lk.RLock()
|
|
defer fbs.lk.RUnlock()
|
|
|
|
if fbs.missFn == nil {
|
|
// FallbackStore wasn't configured yet (chainstore/bitswap aren't up yet)
|
|
// Wait for a bit and retry
|
|
fbs.lk.RUnlock()
|
|
time.Sleep(5 * time.Second)
|
|
fbs.lk.RLock()
|
|
|
|
if fbs.missFn == nil {
|
|
log.Errorw("fallbackstore: missFn not configured yet")
|
|
return nil, ErrNotFound
|
|
}
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Second)
|
|
defer cancel()
|
|
|
|
b, err := fbs.missFn(ctx, c)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// chain bitswap puts blocks in temp blockstore which is cleaned up
|
|
// every few min (to drop any messages we fetched but don't want)
|
|
// in this case we want to keep this block around
|
|
if err := fbs.Put(b); err != nil {
|
|
return nil, xerrors.Errorf("persisting fallback-fetched block: %w", err)
|
|
}
|
|
return b, nil
|
|
}
|
|
|
|
func (fbs *FallbackStore) Get(c cid.Cid) (blocks.Block, error) {
|
|
b, err := fbs.Blockstore.Get(c)
|
|
switch err {
|
|
case nil:
|
|
return b, nil
|
|
case ErrNotFound:
|
|
return fbs.getFallback(c)
|
|
default:
|
|
return b, err
|
|
}
|
|
}
|
|
|
|
func (fbs *FallbackStore) GetSize(c cid.Cid) (int, error) {
|
|
sz, err := fbs.Blockstore.GetSize(c)
|
|
switch err {
|
|
case nil:
|
|
return sz, nil
|
|
case ErrNotFound:
|
|
b, err := fbs.getFallback(c)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return len(b.RawData()), nil
|
|
default:
|
|
return sz, err
|
|
}
|
|
}
|