fix: address review comments

This commit is contained in:
Dirk McCormick 2021-07-20 11:04:47 +02:00
parent b6a7a8c987
commit 44036d9a4a
6 changed files with 134 additions and 81 deletions

View File

@ -11,32 +11,32 @@ import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
)
type LotusMountAPI interface {
type LotusAccessor interface {
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error)
}
type lotusMountApiImpl struct {
type lotusAccessor struct {
pieceStore piecestore.PieceStore
rm retrievalmarket.RetrievalProviderNode
}
var _ LotusMountAPI = (*lotusMountApiImpl)(nil)
var _ LotusAccessor = (*lotusAccessor)(nil)
func NewLotusMountAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) *lotusMountApiImpl {
return &lotusMountApiImpl{
func NewLotusMountAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) *lotusAccessor {
return &lotusAccessor{
pieceStore: store,
rm: rm,
}
}
func (m *lotusMountApiImpl) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
if err != nil {
return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
}
if len(pieceInfo.Deals) <= 0 {
if len(pieceInfo.Deals) == 0 {
return nil, xerrors.Errorf("no storage deals found for Piece %s", pieceCid)
}
@ -44,6 +44,7 @@ func (m *lotusMountApiImpl) FetchUnsealedPiece(ctx context.Context, pieceCid cid
for _, deal := range pieceInfo.Deals {
isUnsealed, err := m.rm.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err != nil {
log.Warnf("failed to check if deal %d unsealed: %s", deal.DealID, err)
continue
}
if isUnsealed {
@ -58,16 +59,23 @@ func (m *lotusMountApiImpl) FetchUnsealedPiece(ctx context.Context, pieceCid cid
lastErr := xerrors.New("no sectors found to unseal from")
// if there is no unsealed sector containing the piece, just read the piece from the first sector we are able to unseal.
for _, deal := range pieceInfo.Deals {
// Note that if the deal data is not already unsealed, unsealing may
// block for a long time with the current PoRep
reader, err := m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err == nil {
return reader, nil
if err != nil {
lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err)
log.Warn(lastErr.Error())
continue
}
lastErr = err
// Successfully fetched the deal data so return a reader over the data
return reader, nil
}
return nil, lastErr
}
func (m *lotusMountApiImpl) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) {
func (m *lotusAccessor) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) {
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
if err != nil {
return 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)

View File

@ -24,7 +24,7 @@ import (
const unsealedSectorID = abi.SectorNumber(1)
const sealedSectorID = abi.SectorNumber(2)
func TestLotusMountApiFetchUnsealedPiece(t *testing.T) {
func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
ctx := context.Background()
cid1, err := cid.Parse("bafkqaaa")
@ -93,7 +93,7 @@ func TestLotusMountApiFetchUnsealedPiece(t *testing.T) {
}
}
func TestLotusMountApiGetUnpaddedCARSize(t *testing.T) {
func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {
cid1, err := cid.Parse("bafkqaaa")
require.NoError(t, err)

View File

@ -2,7 +2,6 @@ package dagstore
import (
"context"
"fmt"
"io"
"net/url"
@ -13,14 +12,13 @@ import (
)
const lotusScheme = "lotus"
const mountURLTemplate = "%s://%s"
var _ mount.Mount = (*LotusMount)(nil)
// LotusMount is the Lotus implementation of a Sharded DAG Store Mount.
// A Filecoin Piece is treated as a Shard by this implementation.
type LotusMount struct {
api LotusMountAPI
api LotusAccessor
pieceCid cid.Cid
}
@ -29,11 +27,11 @@ type LotusMount struct {
// When the registry needs to deserialize a mount it clones the template then
// calls Deserialize on the cloned instance, which will have a reference to the
// lotus mount API supplied here.
func NewLotusMountTemplate(api LotusMountAPI) *LotusMount {
func NewLotusMountTemplate(api LotusAccessor) *LotusMount {
return &LotusMount{api: api}
}
func NewLotusMount(pieceCid cid.Cid, api LotusMountAPI) (*LotusMount, error) {
func NewLotusMount(pieceCid cid.Cid, api LotusAccessor) (*LotusMount, error) {
return &LotusMount{
pieceCid: pieceCid,
api: api,
@ -41,21 +39,12 @@ func NewLotusMount(pieceCid cid.Cid, api LotusMountAPI) (*LotusMount, error) {
}
func (l *LotusMount) Serialize() *url.URL {
u := fmt.Sprintf(mountURLTemplate, lotusScheme, l.pieceCid.String())
url, err := url.Parse(u)
if err != nil {
// Should never happen
panic(xerrors.Errorf("failed to parse mount URL '%s': %w", u, err))
return &url.URL{
Host: l.pieceCid.String(),
}
return url
}
func (l *LotusMount) Deserialize(u *url.URL) error {
if u.Scheme != lotusScheme {
return xerrors.Errorf("scheme '%s' for URL '%s' does not match required scheme '%s'", u.Scheme, u, lotusScheme)
}
pieceCid, err := cid.Decode(u.Host)
if err != nil {
return xerrors.Errorf("failed to parse PieceCid from host '%s': %w", u.Host, err)

View File

@ -2,7 +2,6 @@ package dagstore
import (
"context"
"fmt"
"io/ioutil"
"net/url"
"strings"
@ -67,13 +66,13 @@ func TestLotusMount(t *testing.T) {
}
func TestLotusMountDeserialize(t *testing.T) {
api := &lotusMountApiImpl{}
api := &lotusAccessor{}
bgen := blocksutil.NewBlockGenerator()
cid := bgen.Next().Cid()
// success
us := fmt.Sprintf(mountURLTemplate, lotusScheme, cid.String())
us := lotusScheme + "://" + cid.String()
u, err := url.Parse(us)
require.NoError(t, err)
@ -84,17 +83,8 @@ func TestLotusMountDeserialize(t *testing.T) {
require.Equal(t, cid, mnt.pieceCid)
require.Equal(t, api, mnt.api)
// fails if scheme is not Lotus
us = fmt.Sprintf(mountURLTemplate, "http", cid.String())
u, err = url.Parse(us)
require.NoError(t, err)
err = mnt.Deserialize(u)
require.Error(t, err)
require.Contains(t, err.Error(), "does not match")
// fails if cid is not valid
us = fmt.Sprintf(mountURLTemplate, lotusScheme, "rand")
us = lotusScheme + "://" + "rand"
u, err = url.Parse(us)
require.NoError(t, err)
err = mnt.Deserialize(u)

View File

@ -2,8 +2,10 @@ package dagstore
import (
"context"
"errors"
"io"
"sync"
"sync/atomic"
"time"
"github.com/ipfs/go-cid"
@ -35,23 +37,25 @@ type closableBlockstore struct {
}
type dagStoreWrapper struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
backgroundWg sync.WaitGroup
dagStore *dagstore.DAGStore
mountApi LotusMountAPI
dagStore *dagstore.DAGStore
mountApi LotusAccessor
failureCh chan dagstore.ShardResult
}
var _ shared.DagStoreWrapper = (*dagStoreWrapper)(nil)
func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusMountAPI) (*dagStoreWrapper, error) {
func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*dagStoreWrapper, error) {
// construct the DAG Store.
registry := mount.NewRegistry()
if err := registry.Register(lotusScheme, NewLotusMountTemplate(mountApi)); err != nil {
return nil, xerrors.Errorf("failed to create registry: %w", err)
}
// The dagstore will write Shard failures to the `failureCh` here.
failureCh := make(chan dagstore.ShardResult, 1)
dcfg := dagstore.Config{
TransientsDir: cfg.TransientsDir,
@ -65,37 +69,77 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusMountAPI) (*dagS
return nil, xerrors.Errorf("failed to create DAG store: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
dw := &dagStoreWrapper{
ctx: ctx,
cancel: cancel,
dagStore: dagStore,
mountApi: mountApi,
}
dw.wg.Add(1)
// the dagstore will write Shard failures to the `failureCh` here. Run a go-routine to handle them.
go dw.handleFailures(failureCh)
return dw, nil
return &dagStoreWrapper{
dagStore: dagStore,
mountApi: mountApi,
failureCh: failureCh,
}, nil
}
func (ds *dagStoreWrapper) handleFailures(failureCh chan dagstore.ShardResult) {
defer ds.wg.Done()
ticker := time.NewTicker(gcInterval)
defer ticker.Stop()
func (ds *dagStoreWrapper) Start(ctx context.Context) {
ds.ctx, ds.cancel = context.WithCancel(ctx)
select {
case <-ticker.C:
_, _ = ds.dagStore.GC(ds.ctx)
case f := <-failureCh:
log.Errorw("shard failed", "shard-key", f.Key.String(), "error", f.Error)
if err := ds.dagStore.RecoverShard(ds.ctx, f.Key, nil, dagstore.RecoverOpts{}); err != nil {
log.Warnw("shard recovery failed", "shard-key", f.Key.String(), "error", err)
ds.backgroundWg.Add(1)
// Run a go-routine to handle failures and GC
go ds.background(ds.failureCh)
}
func (ds *dagStoreWrapper) background(failureCh chan dagstore.ShardResult) {
defer ds.backgroundWg.Done()
gcTicker := time.NewTicker(gcInterval)
defer gcTicker.Stop()
recoverShardResults := make(chan dagstore.ShardResult, 32)
var recShardResCount int32
done := make(chan struct{})
defer close(done)
go func() {
// Consume recover shard results
for {
select {
// When the DAG store wrapper shuts down, drain the channel so as
// not to block the DAG store
case <-done:
for i := atomic.LoadInt32(&recShardResCount); i > 0; i-- {
res := <-recoverShardResults
if res.Error != nil {
log.Warnw("shard recovery failed", "shard-key", res.Key.String(), "error", res.Error)
}
}
return
case res := <-recoverShardResults:
atomic.AddInt32(&recShardResCount, -1)
if res.Error != nil {
log.Warnw("shard recovery failed", "shard-key", res.Key.String(), "error", res.Error)
}
}
}
}()
for ds.ctx.Err() != nil {
select {
// GC the DAG store on every tick
case <-gcTicker.C:
_, _ = ds.dagStore.GC(ds.ctx)
// Handle shard failures by attempting to recover the shard
case f := <-failureCh:
atomic.AddInt32(&recShardResCount, 1)
log.Warnw("shard failed", "shard-key", f.Key.String(), "error", f.Error)
if err := ds.dagStore.RecoverShard(ds.ctx, f.Key, recoverShardResults, dagstore.RecoverOpts{}); err != nil {
log.Warnw("shard recovery failed", "shard-key", f.Key.String(), "error", err)
atomic.AddInt32(&recShardResCount, -1)
}
// Exit when the DAG store wrapper is shutdown
case <-ds.ctx.Done():
return
}
case <-ds.ctx.Done():
return
}
}
@ -105,7 +149,7 @@ func (ds *dagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (car
err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{})
if err != nil {
if xerrors.Unwrap(err) != dagstore.ErrShardUnknown {
if !errors.Is(err, dagstore.ErrShardUnknown) {
return nil, xerrors.Errorf("failed to schedule acquire shard for piece CID %s: %w", pieceCid, err)
}
@ -122,7 +166,10 @@ func (ds *dagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (car
}
}
// TODO: Can I rely on AcquireShard to return an error if the context times out?
// TODO: The context is not yet being actively monitored by the DAG store,
// so we need to select against ctx.Done() until the following issue is
// implemented:
// https://github.com/filecoin-project/dagstore/issues/39
var res dagstore.ShardResult
select {
case <-ctx.Done():
@ -163,12 +210,16 @@ func (ds *dagStoreWrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid,
}
func (ds *dagStoreWrapper) Close() error {
// Cancel the context
ds.cancel()
// Close the DAG store
if err := ds.dagStore.Close(); err != nil {
return err
return xerrors.Errorf("failed to close DAG store: %w", err)
}
ds.cancel()
ds.wg.Wait()
// Wait for the background go routine to exit
ds.backgroundWg.Wait()
return nil
}

View File

@ -576,6 +576,7 @@ func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.Conside
}
func DagStoreWrapper(
lc fx.Lifecycle,
ds dtypes.MetadataDS,
r repo.LockedRepo,
pieceStore dtypes.ProviderPieceStore,
@ -589,7 +590,21 @@ func DagStoreWrapper(
Datastore: dagStoreDS,
}
mountApi := dagstore.NewLotusMountAPI(pieceStore, rpn)
return dagstore.NewDagStoreWrapper(cfg, mountApi)
dsw, err := dagstore.NewDagStoreWrapper(cfg, mountApi)
if err != nil {
return nil, xerrors.Errorf("failed to create DAG store wrapper: %w", err)
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
dsw.Start(ctx)
return nil
},
OnStop: func(context.Context) error {
return dsw.Close()
},
})
return dsw, nil
}
func StorageProvider(minerAddress dtypes.MinerAddress,