lotus/markets/dagstore/lotusaccessor.go

145 lines
4.3 KiB
Go
Raw Normal View History

package dagstore
import (
"context"
"io"
"github.com/filecoin-project/dagstore/throttle"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
2021-07-26 22:32:51 +00:00
"github.com/filecoin-project/go-fil-markets/shared"
)
// MaxConcurrentUnsealedFetches caps the amount of concurrent fetches for
// unsealed pieces, so that we don't saturate IO or the network too much,
// especially when bulk processing (e.g. at migration).
var MaxConcurrentUnsealedFetches = 3
2021-07-20 09:04:47 +00:00
type LotusAccessor interface {
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error)
2021-07-21 12:04:07 +00:00
Start(ctx context.Context) error
}
2021-07-20 09:04:47 +00:00
type lotusAccessor struct {
pieceStore piecestore.PieceStore
rm retrievalmarket.RetrievalProviderNode
throttle throttle.Throttler
readyMgr *shared.ReadyManager
}
2021-07-20 09:04:47 +00:00
var _ LotusAccessor = (*lotusAccessor)(nil)
2021-07-26 22:32:51 +00:00
func NewLotusAccessor(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) LotusAccessor {
2021-07-20 09:04:47 +00:00
return &lotusAccessor{
pieceStore: store,
rm: rm,
throttle: throttle.Fixed(MaxConcurrentUnsealedFetches),
2021-07-21 12:04:07 +00:00
readyMgr: shared.NewReadyManager(),
}
}
2021-07-21 12:04:07 +00:00
func (m *lotusAccessor) Start(ctx context.Context) error {
// Wait for the piece store to startup
ready := make(chan error)
m.pieceStore.OnReady(func(err error) {
select {
case <-ctx.Done():
case ready <- err:
}
})
select {
case <-ctx.Done():
2021-07-21 12:04:07 +00:00
err := xerrors.Errorf("context cancelled waiting for piece store startup: %w", ctx.Err())
2021-07-21 12:19:09 +00:00
if ferr := m.readyMgr.FireReady(err); ferr != nil {
log.Warnw("failed to publish ready event", "err", ferr)
2021-07-21 12:04:07 +00:00
}
return err
case err := <-ready:
2021-07-21 12:19:09 +00:00
if ferr := m.readyMgr.FireReady(err); ferr != nil {
log.Warnw("failed to publish ready event", "err", ferr)
}
2021-07-21 12:04:07 +00:00
return err
}
}
2021-07-20 09:04:47 +00:00
func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
2021-07-21 12:04:07 +00:00
err := m.readyMgr.AwaitReady()
if err != nil {
return nil, err
}
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
if err != nil {
return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
}
2021-07-20 09:04:47 +00:00
if len(pieceInfo.Deals) == 0 {
return nil, xerrors.Errorf("no storage deals found for Piece %s", pieceCid)
}
// prefer an unsealed sector containing the piece if one exists
for _, deal := range pieceInfo.Deals {
isUnsealed, err := m.rm.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err != nil {
2021-07-20 09:04:47 +00:00
log.Warnf("failed to check if deal %d unsealed: %s", deal.DealID, err)
continue
}
if isUnsealed {
var reader io.ReadCloser
// We want to throttle this path, as these copies will be downloaded
// immediately from the storage cluster without any unsealing
// necessary.
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
// UnsealSector will NOT unseal a sector if we already have an unsealed copy lying around.
reader, err = m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
return err
})
return reader, err
}
}
lastErr := xerrors.New("no sectors found to unseal from")
// if there is no unsealed sector containing the piece, just read the piece from the first sector we are able to unseal.
for _, deal := range pieceInfo.Deals {
2021-07-20 09:04:47 +00:00
// Note that if the deal data is not already unsealed, unsealing may
// block for a long time with the current PoRep
reader, err := m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
2021-07-20 09:04:47 +00:00
if err != nil {
lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err)
log.Warn(lastErr.Error())
continue
}
2021-07-20 09:04:47 +00:00
// Successfully fetched the deal data so return a reader over the data
return reader, nil
}
2021-07-20 09:04:47 +00:00
return nil, lastErr
}
func (m *lotusAccessor) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
2021-07-21 12:04:07 +00:00
err := m.readyMgr.AwaitReady()
if err != nil {
return 0, err
}
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
if err != nil {
return 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
}
if len(pieceInfo.Deals) == 0 {
return 0, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
}
len := pieceInfo.Deals[0].Length
return uint64(len), nil
}