minor renaming.

This commit is contained in:
Raúl Kripalani 2021-08-03 12:22:40 +01:00
parent f1d98361a6
commit 4076746141
8 changed files with 63 additions and 62 deletions

View File

@ -30,7 +30,7 @@ var MaxConcurrentStorageCalls = func() int {
return 100
}()
type LotusAccessor interface {
type MinerAPI interface {
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error)
IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error)
@ -44,9 +44,9 @@ type lotusAccessor struct {
readyMgr *shared.ReadyManager
}
var _ LotusAccessor = (*lotusAccessor)(nil)
var _ MinerAPI = (*lotusAccessor)(nil)
func NewLotusAccessor(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) LotusAccessor {
func NewMinerAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) MinerAPI {
return &lotusAccessor{
pieceStore: store,
rm: rm,

View File

@ -74,7 +74,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
rpn := &mockRPN{
sectors: mockData,
}
api := NewLotusAccessor(ps, rpn)
api := NewMinerAPI(ps, rpn)
require.NoError(t, api.Start(ctx))
// Add deals to piece store
@ -114,7 +114,7 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {
ps := getPieceStore(t)
rpn := &mockRPN{}
api := NewLotusAccessor(ps, rpn)
api := NewMinerAPI(ps, rpn)
require.NoError(t, api.Start(ctx))
// Add a deal with data Length 10
@ -143,7 +143,7 @@ func TestThrottle(t *testing.T) {
unsealedSectorID: "foo",
},
}
api := NewLotusAccessor(ps, rpn)
api := NewMinerAPI(ps, rpn)
require.NoError(t, api.Start(ctx))
// Add a deal with data Length 10

View File

@ -18,7 +18,7 @@ var _ mount.Mount = (*LotusMount)(nil)
// LotusMount is the Lotus implementation of a Sharded DAG Store Mount.
// A Filecoin Piece is treated as a Shard by this implementation.
type LotusMount struct {
Api LotusAccessor
API MinerAPI
PieceCid cid.Cid
}
@ -29,14 +29,14 @@ type LotusMount struct {
// When the registry needs to deserialize a mount it clones the template then
// calls Deserialize on the cloned instance, which will have a reference to the
// lotus mount API supplied here.
func NewLotusMountTemplate(api LotusAccessor) *LotusMount {
return &LotusMount{Api: api}
func NewLotusMountTemplate(api MinerAPI) *LotusMount {
return &LotusMount{API: api}
}
func NewLotusMount(pieceCid cid.Cid, api LotusAccessor) (*LotusMount, error) {
func NewLotusMount(pieceCid cid.Cid, api MinerAPI) (*LotusMount, error) {
return &LotusMount{
PieceCid: pieceCid,
Api: api,
API: api,
}, nil
}
@ -57,7 +57,7 @@ func (l *LotusMount) Deserialize(u *url.URL) error {
}
func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) {
r, err := l.Api.FetchUnsealedPiece(ctx, l.PieceCid)
r, err := l.API.FetchUnsealedPiece(ctx, l.PieceCid)
if err != nil {
return nil, xerrors.Errorf("failed to fetch unsealed piece %s: %w", l.PieceCid, err)
}
@ -78,11 +78,11 @@ func (l *LotusMount) Close() error {
}
func (l *LotusMount) Stat(ctx context.Context) (mount.Stat, error) {
size, err := l.Api.GetUnpaddedCARSize(ctx, l.PieceCid)
size, err := l.API.GetUnpaddedCARSize(ctx, l.PieceCid)
if err != nil {
return mount.Stat{}, xerrors.Errorf("failed to fetch piece size for piece %s: %w", l.PieceCid, err)
}
isUnsealed, err := l.Api.IsUnsealed(ctx, l.PieceCid)
isUnsealed, err := l.API.IsUnsealed(ctx, l.PieceCid)
if err != nil {
return mount.Stat{}, xerrors.Errorf("failed to verify if we have the unsealed piece %s: %w", l.PieceCid, err)
}

View File

@ -84,7 +84,7 @@ func TestLotusMountDeserialize(t *testing.T) {
require.NoError(t, err)
require.Equal(t, cid, mnt.PieceCid)
require.Equal(t, api, mnt.Api)
require.Equal(t, api, mnt.API)
// fails if cid is not valid
us = lotusScheme + "://" + "rand"

View File

@ -49,8 +49,8 @@ type Wrapper struct {
cancel context.CancelFunc
backgroundWg sync.WaitGroup
dagStore DAGStore
mountApi LotusAccessor
dagst DAGStore
mountApi MinerAPI
failureCh chan dagstore.ShardResult
traceCh chan dagstore.Trace
gcInterval time.Duration
@ -58,7 +58,7 @@ type Wrapper struct {
var _ shared.DagStoreWrapper = (*Wrapper)(nil)
func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrapper, error) {
func NewWrapper(cfg MarketDAGStoreConfig, mountApi MinerAPI) (*Wrapper, error) {
// construct the DAG Store.
registry := mount.NewRegistry()
if err := registry.Register(lotusScheme, NewLotusMountTemplate(mountApi)); err != nil {
@ -94,7 +94,7 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap
}
return &Wrapper{
dagStore: dagStore,
dagst: dagStore,
mountApi: mountApi,
failureCh: failureCh,
traceCh: traceCh,
@ -102,68 +102,69 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap
}, nil
}
func (ds *Wrapper) Start(ctx context.Context) error {
ds.ctx, ds.cancel = context.WithCancel(ctx)
func (w *Wrapper) Start(ctx context.Context) error {
w.ctx, w.cancel = context.WithCancel(ctx)
// Run a go-routine to do DagStore GC.
ds.backgroundWg.Add(1)
go ds.dagStoreGCLoop()
w.backgroundWg.Add(1)
go w.gcLoop()
// run a go-routine to read the trace for debugging.
ds.backgroundWg.Add(1)
go ds.traceLoop()
w.backgroundWg.Add(1)
go w.traceLoop()
// Run a go-routine for shard recovery
if dss, ok := ds.dagStore.(*dagstore.DAGStore); ok {
ds.backgroundWg.Add(1)
go dagstore.RecoverImmediately(ds.ctx, dss, ds.failureCh, maxRecoverAttempts, ds.backgroundWg.Done)
if dss, ok := w.dagst.(*dagstore.DAGStore); ok {
w.backgroundWg.Add(1)
go dagstore.RecoverImmediately(w.ctx, dss, w.failureCh, maxRecoverAttempts, w.backgroundWg.Done)
}
return ds.dagStore.Start(ctx)
return w.dagst.Start(ctx)
}
func (ds *Wrapper) traceLoop() {
defer ds.backgroundWg.Done()
func (w *Wrapper) traceLoop() {
defer w.backgroundWg.Done()
for ds.ctx.Err() == nil {
for w.ctx.Err() == nil {
select {
// Log trace events from the DAG store
case tr := <-ds.traceCh:
case tr := <-w.traceCh:
log.Debugw("trace",
"shard-key", tr.Key.String(),
"op-type", tr.Op.String(),
"after", tr.After.String())
case <-ds.ctx.Done():
case <-w.ctx.Done():
return
}
}
}
func (ds *Wrapper) dagStoreGCLoop() {
defer ds.backgroundWg.Done()
func (w *Wrapper) gcLoop() {
defer w.backgroundWg.Done()
gcTicker := time.NewTicker(ds.gcInterval)
defer gcTicker.Stop()
ticker := time.NewTicker(w.gcInterval)
defer ticker.Stop()
for ds.ctx.Err() == nil {
for w.ctx.Err() == nil {
select {
// GC the DAG store on every tick
case <-gcTicker.C:
_, _ = ds.dagStore.GC(ds.ctx)
case <-ticker.C:
_, _ = w.dagst.GC(w.ctx)
// Exit when the DAG store wrapper is shutdown
case <-ds.ctx.Done():
case <-w.ctx.Done():
return
}
}
}
func (ds *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) {
func (w *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) {
log.Debugf("acquiring shard for piece CID %s", pieceCid)
key := shard.KeyFromCID(pieceCid)
resch := make(chan dagstore.ShardResult, 1)
err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{})
err := w.dagst.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{})
log.Debugf("sent message to acquire shard for piece CID %s", pieceCid)
if err != nil {
@ -178,13 +179,13 @@ func (ds *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.Cl
// we already have a transient file. However, we don't have it here
// and therefore we pass an empty file path.
carPath := ""
if err := shared.RegisterShardSync(ctx, ds, pieceCid, carPath, false); err != nil {
if err := shared.RegisterShardSync(ctx, w, pieceCid, carPath, false); err != nil {
return nil, xerrors.Errorf("failed to re-register shard during loading piece CID %s: %w", pieceCid, err)
}
log.Warnw("successfully re-registered shard", "pieceCID", pieceCid)
resch = make(chan dagstore.ShardResult, 1)
if err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}); err != nil {
if err := w.dagst.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}); err != nil {
return nil, xerrors.Errorf("failed to acquire Shard for piece CID %s after re-registering: %w", pieceCid, err)
}
}
@ -212,10 +213,10 @@ func (ds *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.Cl
return &Blockstore{ReadBlockstore: bs, Closer: res.Accessor}, nil
}
func (ds *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error {
func (w *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error {
// Create a lotus mount with the piece CID
key := shard.KeyFromCID(pieceCid)
mt, err := NewLotusMount(pieceCid, ds.mountApi)
mt, err := NewLotusMount(pieceCid, w.mountApi)
if err != nil {
return xerrors.Errorf("failed to create lotus mount for piece CID %s: %w", pieceCid, err)
}
@ -225,7 +226,7 @@ func (ds *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath
ExistingTransient: carPath,
LazyInitialization: !eagerInit,
}
err = ds.dagStore.RegisterShard(ctx, key, mt, resch, opts)
err = w.dagst.RegisterShard(ctx, key, mt, resch, opts)
if err != nil {
return xerrors.Errorf("failed to schedule register shard for piece CID %s: %w", pieceCid, err)
}
@ -234,20 +235,20 @@ func (ds *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath
return nil
}
func (ds *Wrapper) Close() error {
func (w *Wrapper) Close() error {
// Cancel the context
ds.cancel()
w.cancel()
// Close the DAG store
log.Info("will close the dagstore")
if err := ds.dagStore.Close(); err != nil {
if err := w.dagst.Close(); err != nil {
return xerrors.Errorf("failed to close DAG store: %w", err)
}
log.Info("dagstore closed")
// Wait for the background go routine to exit
log.Info("waiting for dagstore background wrapper routines to exist")
ds.backgroundWg.Wait()
w.backgroundWg.Wait()
log.Info("exited dagstore background warpper routines")
return nil

View File

@ -26,7 +26,7 @@ func TestWrapperAcquireRecovery(t *testing.T) {
require.NoError(t, err)
// Create a DAG store wrapper
w, err := NewDagStoreWrapper(MarketDAGStoreConfig{
w, err := NewWrapper(MarketDAGStoreConfig{
TransientsDir: t.TempDir(),
IndexDir: t.TempDir(),
GCInterval: time.Millisecond,
@ -45,7 +45,7 @@ func TestWrapperAcquireRecovery(t *testing.T) {
},
register: make(chan shard.Key, 1),
}
w.dagStore = mock
w.dagst = mock
mybs, err := w.LoadShard(ctx, pieceCid)
require.NoError(t, err)
@ -76,7 +76,7 @@ func TestWrapperBackground(t *testing.T) {
ctx := context.Background()
// Create a DAG store wrapper
w, err := NewDagStoreWrapper(MarketDAGStoreConfig{
w, err := NewWrapper(MarketDAGStoreConfig{
TransientsDir: t.TempDir(),
IndexDir: t.TempDir(),
GCInterval: time.Millisecond,
@ -89,7 +89,7 @@ func TestWrapperBackground(t *testing.T) {
recover: make(chan shard.Key, 1),
close: make(chan struct{}, 1),
}
w.dagStore = mock
w.dagst = mock
// Start up the wrapper
err = w.Start(ctx)

View File

@ -148,7 +148,7 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)),
// DAG Store
Override(new(dagstore.LotusAccessor), modules.NewLotusAccessor),
Override(new(dagstore.MinerAPI), modules.NewLotusAccessor),
Override(new(*dagstore.Wrapper), modules.DAGStoreWrapper),
// Markets (retrieval)

View File

@ -23,8 +23,8 @@ import (
func NewLotusAccessor(lc fx.Lifecycle,
pieceStore dtypes.ProviderPieceStore,
rpn retrievalmarket.RetrievalProviderNode,
) (dagstore.LotusAccessor, error) {
mountApi := dagstore.NewLotusAccessor(pieceStore, rpn)
) (dagstore.MinerAPI, error) {
mountApi := dagstore.NewMinerAPI(pieceStore, rpn)
ready := make(chan error, 1)
pieceStore.OnReady(func(err error) {
ready <- err
@ -47,7 +47,7 @@ func NewLotusAccessor(lc fx.Lifecycle,
func DAGStoreWrapper(
lc fx.Lifecycle,
r repo.LockedRepo,
lotusAccessor dagstore.LotusAccessor,
lotusAccessor dagstore.MinerAPI,
) (*dagstore.Wrapper, error) {
dir := filepath.Join(r.Path(), dagStore)
ds, err := newDAGStoreDatastore(dir)
@ -74,7 +74,7 @@ func DAGStoreWrapper(
MaxConcurrentReadyFetches: maxCopies,
}
dsw, err := dagstore.NewDagStoreWrapper(cfg, lotusAccessor)
dsw, err := dagstore.NewWrapper(cfg, lotusAccessor)
if err != nil {
return nil, xerrors.Errorf("failed to create DAG store wrapper: %w", err)
}