rename blockstores for consistency.

This commit is contained in:
Raúl Kripalani 2021-01-29 23:17:25 +00:00
parent e02fdf5064
commit d1104fec4c
14 changed files with 71 additions and 85 deletions

View File

@ -13,27 +13,27 @@ type ChainIO interface {
ChainHasObj(context.Context, cid.Cid) (bool, error)
}
type apiBStore struct {
type apiBlockstore struct {
api ChainIO
}
// This blockstore is adapted in the constructor.
var _ BasicBlockstore = &apiBStore{}
var _ BasicBlockstore = (*apiBlockstore)(nil)
func NewAPIBlockstore(cio ChainIO) Blockstore {
bs := &apiBStore{api: cio}
bs := &apiBlockstore{api: cio}
return Adapt(bs) // return an adapted blockstore.
}
func (a *apiBStore) DeleteBlock(cid.Cid) error {
func (a *apiBlockstore) DeleteBlock(cid.Cid) error {
return xerrors.New("not supported")
}
func (a *apiBStore) Has(c cid.Cid) (bool, error) {
func (a *apiBlockstore) Has(c cid.Cid) (bool, error) {
return a.api.ChainHasObj(context.TODO(), c)
}
func (a *apiBStore) Get(c cid.Cid) (blocks.Block, error) {
func (a *apiBlockstore) Get(c cid.Cid) (blocks.Block, error) {
bb, err := a.api.ChainReadObj(context.TODO(), c)
if err != nil {
return nil, err
@ -41,7 +41,7 @@ func (a *apiBStore) Get(c cid.Cid) (blocks.Block, error) {
return blocks.NewBlockWithCid(bb, c)
}
func (a *apiBStore) GetSize(c cid.Cid) (int, error) {
func (a *apiBlockstore) GetSize(c cid.Cid) (int, error) {
bb, err := a.api.ChainReadObj(context.TODO(), c)
if err != nil {
return 0, err
@ -49,18 +49,18 @@ func (a *apiBStore) GetSize(c cid.Cid) (int, error) {
return len(bb), nil
}
func (a *apiBStore) Put(blocks.Block) error {
func (a *apiBlockstore) Put(blocks.Block) error {
return xerrors.New("not supported")
}
func (a *apiBStore) PutMany([]blocks.Block) error {
func (a *apiBlockstore) PutMany([]blocks.Block) error {
return xerrors.New("not supported")
}
func (a *apiBStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
func (a *apiBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, xerrors.New("not supported")
}
func (a *apiBStore) HashOnRead(enabled bool) {
func (a *apiBlockstore) HashOnRead(enabled bool) {
return
}

View File

@ -52,8 +52,10 @@ func NewTieredBstore(r Blockstore, w Blockstore) *BufferedBlockstore {
}
}
var _ Blockstore = (*BufferedBlockstore)(nil)
var _ Viewer = (*BufferedBlockstore)(nil)
var (
_ Blockstore = (*BufferedBlockstore)(nil)
_ Viewer = (*BufferedBlockstore)(nil)
)
func (bs *BufferedBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
a, err := bs.read.AllKeysChan(ctx)

View File

@ -18,14 +18,14 @@ import (
"github.com/ipfs/interface-go-ipfs-core/path"
)
type IpfsBstore struct {
type IPFSBlockstore struct {
ctx context.Context
api iface.CoreAPI
}
var _ Blockstore = (*IpfsBstore)(nil)
var _ BasicBlockstore = (*IPFSBlockstore)(nil)
func NewIpfsBstore(ctx context.Context, onlineMode bool) (*IpfsBstore, error) {
func NewLocalIPFSBlockstore(ctx context.Context, onlineMode bool) (Blockstore, error) {
localApi, err := httpapi.NewLocalApi()
if err != nil {
return nil, xerrors.Errorf("getting local ipfs api: %w", err)
@ -34,14 +34,14 @@ func NewIpfsBstore(ctx context.Context, onlineMode bool) (*IpfsBstore, error) {
if err != nil {
return nil, xerrors.Errorf("setting offline mode: %s", err)
}
return &IpfsBstore{
b := &IPFSBlockstore{
ctx: ctx,
api: api,
}, nil
}
return Adapt(b), nil
}
func NewRemoteIpfsBstore(ctx context.Context, maddr multiaddr.Multiaddr, onlineMode bool) (*IpfsBstore, error) {
func NewRemoteIPFSBlockstore(ctx context.Context, maddr multiaddr.Multiaddr, onlineMode bool) (Blockstore, error) {
httpApi, err := httpapi.NewApi(maddr)
if err != nil {
return nil, xerrors.Errorf("setting remote ipfs api: %w", err)
@ -50,18 +50,18 @@ func NewRemoteIpfsBstore(ctx context.Context, maddr multiaddr.Multiaddr, onlineM
if err != nil {
return nil, xerrors.Errorf("applying offline mode: %s", err)
}
return &IpfsBstore{
b := &IPFSBlockstore{
ctx: ctx,
api: api,
}, nil
}
return Adapt(b), nil
}
func (i *IpfsBstore) DeleteBlock(cid cid.Cid) error {
func (i *IPFSBlockstore) DeleteBlock(cid cid.Cid) error {
return xerrors.Errorf("not supported")
}
func (i *IpfsBstore) Has(cid cid.Cid) (bool, error) {
func (i *IPFSBlockstore) Has(cid cid.Cid) (bool, error) {
_, err := i.api.Block().Stat(i.ctx, path.IpldPath(cid))
if err != nil {
// The underlying client is running in Offline mode.
@ -77,7 +77,7 @@ func (i *IpfsBstore) Has(cid cid.Cid) (bool, error) {
return true, nil
}
func (i *IpfsBstore) Get(cid cid.Cid) (blocks.Block, error) {
func (i *IPFSBlockstore) Get(cid cid.Cid) (blocks.Block, error) {
rd, err := i.api.Block().Get(i.ctx, path.IpldPath(cid))
if err != nil {
return nil, xerrors.Errorf("getting ipfs block: %w", err)
@ -91,7 +91,7 @@ func (i *IpfsBstore) Get(cid cid.Cid) (blocks.Block, error) {
return blocks.NewBlockWithCid(data, cid)
}
func (i *IpfsBstore) GetSize(cid cid.Cid) (int, error) {
func (i *IPFSBlockstore) GetSize(cid cid.Cid) (int, error) {
st, err := i.api.Block().Stat(i.ctx, path.IpldPath(cid))
if err != nil {
return 0, xerrors.Errorf("getting ipfs block: %w", err)
@ -100,7 +100,7 @@ func (i *IpfsBstore) GetSize(cid cid.Cid) (int, error) {
return st.Size(), nil
}
func (i *IpfsBstore) Put(block blocks.Block) error {
func (i *IPFSBlockstore) Put(block blocks.Block) error {
mhd, err := multihash.Decode(block.Cid().Hash())
if err != nil {
return err
@ -112,7 +112,7 @@ func (i *IpfsBstore) Put(block blocks.Block) error {
return err
}
func (i *IpfsBstore) PutMany(blocks []blocks.Block) error {
func (i *IPFSBlockstore) PutMany(blocks []blocks.Block) error {
// TODO: could be done in parallel
for _, block := range blocks {
@ -124,18 +124,10 @@ func (i *IpfsBstore) PutMany(blocks []blocks.Block) error {
return nil
}
func (i *IpfsBstore) View(c cid.Cid, callback func([]byte) error) error {
blk, err := i.Get(c)
if err != nil {
return err
}
return callback(blk.RawData())
}
func (i *IpfsBstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
func (i *IPFSBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, xerrors.Errorf("not supported")
}
func (i *IpfsBstore) HashOnRead(enabled bool) {
func (i *IPFSBlockstore) HashOnRead(enabled bool) {
return // TODO: We could technically support this, but..
}

View File

@ -8,8 +8,8 @@ import (
"github.com/ipfs/go-cid"
)
// NewTemporarySync returns a thread-safe temporary blockstore.
func NewTemporarySync() *SyncBlockstore {
// NewMemorySync returns a thread-safe in-memory blockstore.
func NewMemorySync() *SyncBlockstore {
return &SyncBlockstore{bs: make(MemBlockstore)}
}
@ -45,31 +45,24 @@ func (m *SyncBlockstore) Get(k cid.Cid) (blocks.Block, error) {
return m.bs.Get(k)
}
// GetSize returns the CIDs mapped BlockSize
func (m *SyncBlockstore) GetSize(k cid.Cid) (int, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.bs.GetSize(k)
}
// Put puts a given block to the underlying datastore
func (m *SyncBlockstore) Put(b blocks.Block) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.bs.Put(b)
}
// PutMany puts a slice of blocks at the same time using batching
// capabilities of the underlying datastore whenever possible.
func (m *SyncBlockstore) PutMany(bs []blocks.Block) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.bs.PutMany(bs)
}
// AllKeysChan returns a channel from which
// the CIDs in the Blockstore can be read. It should respect
// the given context, closing the channel if it becomes Done.
func (m *SyncBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
m.mu.RLock()
defer m.mu.RUnlock()
@ -77,8 +70,6 @@ func (m *SyncBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error
return m.bs.AllKeysChan(ctx)
}
// HashOnRead specifies if every read block should be
// rehashed to make sure it matches its CID.
func (m *SyncBlockstore) HashOnRead(enabled bool) {
// noop
}

View File

@ -12,14 +12,16 @@ import (
"go.uber.org/multierr"
)
// TimedCacheBS is a blockstore that keeps blocks for at least the specified
// caching interval before discarding them. Garbage collection must be started
// and stopped by calling Start/Stop.
// TimedCacheBlockstore is a blockstore that keeps blocks for at least the
// specified caching interval before discarding them. Garbage collection must
// be started and stopped by calling Start/Stop.
//
// Under the covers, it's implemented with an active and an inactive blockstore
// that are rotated every cache time interval. This means all blocks will be
// stored at most 2x the cache interval.
type TimedCacheBS struct {
//
// Create a new instance by calling the NewTimedCacheBlockstore constructor.
type TimedCacheBlockstore struct {
mu sync.RWMutex
active, inactive MemBlockstore
clock clock.Clock
@ -28,18 +30,17 @@ type TimedCacheBS struct {
doneRotatingCh chan struct{}
}
var _ Blockstore = (*TimedCacheBS)(nil)
func NewTimedCacheBS(cacheTime time.Duration) *TimedCacheBS {
return &TimedCacheBS{
func NewTimedCacheBlockstore(interval time.Duration) *TimedCacheBlockstore {
b := &TimedCacheBlockstore{
active: NewMemory(),
inactive: NewMemory(),
interval: cacheTime,
interval: interval,
clock: clock.New(),
}
return b
}
func (t *TimedCacheBS) Start(ctx context.Context) error {
func (t *TimedCacheBlockstore) Start(_ context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.closeCh != nil {
@ -64,11 +65,11 @@ func (t *TimedCacheBS) Start(ctx context.Context) error {
return nil
}
func (t *TimedCacheBS) Stop(ctx context.Context) error {
func (t *TimedCacheBlockstore) Stop(_ context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.closeCh == nil {
return fmt.Errorf("not started started")
return fmt.Errorf("not started")
}
select {
case <-t.closeCh:
@ -79,7 +80,7 @@ func (t *TimedCacheBS) Stop(ctx context.Context) error {
return nil
}
func (t *TimedCacheBS) rotate() {
func (t *TimedCacheBlockstore) rotate() {
newBs := NewMemory()
t.mu.Lock()
@ -87,7 +88,7 @@ func (t *TimedCacheBS) rotate() {
t.mu.Unlock()
}
func (t *TimedCacheBS) Put(b blocks.Block) error {
func (t *TimedCacheBlockstore) Put(b blocks.Block) error {
// Don't check the inactive set here. We want to keep this block for at
// least one interval.
t.mu.Lock()
@ -95,13 +96,13 @@ func (t *TimedCacheBS) Put(b blocks.Block) error {
return t.active.Put(b)
}
func (t *TimedCacheBS) PutMany(bs []blocks.Block) error {
func (t *TimedCacheBlockstore) PutMany(bs []blocks.Block) error {
t.mu.Lock()
defer t.mu.Unlock()
return t.active.PutMany(bs)
}
func (t *TimedCacheBS) View(c cid.Cid, callback func([]byte) error) error {
func (t *TimedCacheBlockstore) View(c cid.Cid, callback func([]byte) error) error {
blk, err := t.Get(c)
if err != nil {
return err
@ -109,7 +110,7 @@ func (t *TimedCacheBS) View(c cid.Cid, callback func([]byte) error) error {
return callback(blk.RawData())
}
func (t *TimedCacheBS) Get(k cid.Cid) (blocks.Block, error) {
func (t *TimedCacheBlockstore) Get(k cid.Cid) (blocks.Block, error) {
t.mu.RLock()
defer t.mu.RUnlock()
b, err := t.active.Get(k)
@ -119,7 +120,7 @@ func (t *TimedCacheBS) Get(k cid.Cid) (blocks.Block, error) {
return b, err
}
func (t *TimedCacheBS) GetSize(k cid.Cid) (int, error) {
func (t *TimedCacheBlockstore) GetSize(k cid.Cid) (int, error) {
t.mu.RLock()
defer t.mu.RUnlock()
size, err := t.active.GetSize(k)
@ -129,7 +130,7 @@ func (t *TimedCacheBS) GetSize(k cid.Cid) (int, error) {
return size, err
}
func (t *TimedCacheBS) Has(k cid.Cid) (bool, error) {
func (t *TimedCacheBlockstore) Has(k cid.Cid) (bool, error) {
t.mu.RLock()
defer t.mu.RUnlock()
if has, err := t.active.Has(k); err != nil {
@ -140,17 +141,17 @@ func (t *TimedCacheBS) Has(k cid.Cid) (bool, error) {
return t.inactive.Has(k)
}
func (t *TimedCacheBS) HashOnRead(_ bool) {
func (t *TimedCacheBlockstore) HashOnRead(_ bool) {
// no-op
}
func (t *TimedCacheBS) DeleteBlock(k cid.Cid) error {
func (t *TimedCacheBlockstore) DeleteBlock(k cid.Cid) error {
t.mu.Lock()
defer t.mu.Unlock()
return multierr.Combine(t.active.DeleteBlock(k), t.inactive.DeleteBlock(k))
}
func (t *TimedCacheBS) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
func (t *TimedCacheBlockstore) AllKeysChan(_ context.Context) (<-chan cid.Cid, error) {
t.mu.RLock()
defer t.mu.RUnlock()

View File

@ -12,8 +12,8 @@ import (
"github.com/ipfs/go-cid"
)
func TestTimedBSSimple(t *testing.T) {
tc := NewTimedCacheBS(10 * time.Millisecond)
func TestTimedCacheBlockstoreSimple(t *testing.T) {
tc := NewTimedCacheBlockstore(10 * time.Millisecond)
mClock := clock.NewMock()
mClock.Set(time.Now())
tc.clock = mClock

View File

@ -295,7 +295,7 @@ func (t *TestDiffArray) Remove(key uint64, val *typegen.Deferred) error {
func newContextStore() Store {
ctx := context.Background()
bs := bstore.NewTemporarySync()
bs := bstore.NewMemorySync()
store := cbornode.NewCborStore(bs)
return WrapStore(ctx, store)
}

View File

@ -36,7 +36,7 @@ func init() {
func TestMarketPredicates(t *testing.T) {
ctx := context.Background()
bs := bstore.NewTemporarySync()
bs := bstore.NewMemorySync()
store := adt2.WrapStore(ctx, cbornode.NewCborStore(bs))
oldDeal1 := &market2.DealState{
@ -334,7 +334,7 @@ func TestMarketPredicates(t *testing.T) {
func TestMinerSectorChange(t *testing.T) {
ctx := context.Background()
bs := bstore.NewTemporarySync()
bs := bstore.NewMemorySync()
store := adt2.WrapStore(ctx, cbornode.NewCborStore(bs))
nextID := uint64(0)

View File

@ -813,7 +813,7 @@ func UpgradeRefuel(ctx context.Context, sm *StateManager, _ MigrationCache, cb E
}
func UpgradeActorsV2(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
buf := blockstore.NewTieredBstore(sm.cs.Blockstore(), blockstore.NewTemporarySync())
buf := blockstore.NewTieredBstore(sm.cs.Blockstore(), blockstore.NewMemorySync())
store := store.ActorStore(ctx, buf)
info, err := store.Put(ctx, new(types.StateInfo0))
@ -965,7 +965,7 @@ func upgradeActorsV3Common(
root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet,
config nv10.Config,
) (cid.Cid, error) {
buf := blockstore.NewTieredBstore(sm.cs.Blockstore(), blockstore.NewTemporarySync())
buf := blockstore.NewTieredBstore(sm.cs.Blockstore(), blockstore.NewMemorySync())
store := store.ActorStore(ctx, buf)
// Load the state root.

View File

@ -30,7 +30,7 @@ func TestIndexSeeks(t *testing.T) {
ctx := context.TODO()
nbs := blockstore.NewTemporarySync()
nbs := blockstore.NewMemorySync()
cs := store.NewChainStore(nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil)
defer cs.Close() //nolint:errcheck

View File

@ -28,7 +28,7 @@ import (
func TestDealStateMatcher(t *testing.T) {
ctx := context.Background()
bs := bstore.NewTemporarySync()
bs := bstore.NewMemorySync()
store := adt2.WrapStore(ctx, cbornode.NewCborStore(bs))
deal1 := &market2.DealState{

View File

@ -44,7 +44,7 @@ func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt r
// Write all incoming bitswap blocks into a temporary blockstore for two
// block times. If they validate, they'll be persisted later.
cache := blockstore.NewTimedCacheBS(2 * time.Duration(build.BlockDelaySecs) * time.Second)
cache := blockstore.NewTimedCacheBlockstore(2 * time.Duration(build.BlockDelaySecs) * time.Second)
lc.Append(fx.Hook{OnStop: cache.Stop, OnStart: cache.Start})
bitswapBs := blockstore.NewTieredBstore(bs, cache)

View File

@ -25,9 +25,9 @@ func IpfsClientBlockstore(ipfsMaddr string, onlineMode bool) func(helpers.Metric
if err != nil {
return nil, xerrors.Errorf("parsing ipfs multiaddr: %w", err)
}
ipfsbs, err = blockstore.NewRemoteIpfsBstore(helpers.LifecycleCtx(mctx, lc), ma, onlineMode)
ipfsbs, err = blockstore.NewRemoteIPFSBlockstore(helpers.LifecycleCtx(mctx, lc), ma, onlineMode)
} else {
ipfsbs, err = blockstore.NewIpfsBstore(helpers.LifecycleCtx(mctx, lc), onlineMode)
ipfsbs, err = blockstore.NewLocalIPFSBlockstore(helpers.LifecycleCtx(mctx, lc), onlineMode)
}
if err != nil {
return nil, xerrors.Errorf("constructing ipfs blockstore: %w", err)

View File

@ -160,7 +160,7 @@ func NewMemory(opts *MemRepoOptions) *MemRepo {
return &MemRepo{
repoLock: make(chan struct{}, 1),
blockstore: blockstore.WrapIDStore(blockstore.NewTemporarySync()),
blockstore: blockstore.WrapIDStore(blockstore.NewMemorySync()),
datastore: opts.Ds,
configF: opts.ConfigF,
keystore: opts.KeyStore,