Merge pull request #7746 from filecoin-project/feat/faster-retrieval

Make retrieval even faster
This commit is contained in:
Łukasz Magiera 2021-12-10 12:01:01 +01:00 committed by GitHub
commit 36f1a8f3ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 282 additions and 137 deletions

View File

@ -8,7 +8,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
)
var MTTresh = uint64(32 << 20)
var MTTresh = uint64(512 << 10)
func mtChunkCount(usz abi.PaddedPieceSize) uint64 {
threads := (uint64(usz)) / MTTresh

View File

@ -16,13 +16,21 @@ type unpadReader struct {
work []byte
}
func BufSize(sz abi.PaddedPieceSize) int {
return int(MTTresh * mtChunkCount(sz))
}
func NewUnpadReader(src io.Reader, sz abi.PaddedPieceSize) (io.Reader, error) {
buf := make([]byte, BufSize(sz))
return NewUnpadReaderBuf(src, sz, buf)
}
func NewUnpadReaderBuf(src io.Reader, sz abi.PaddedPieceSize, buf []byte) (io.Reader, error) {
if err := sz.Validate(); err != nil {
return nil, xerrors.Errorf("bad piece size: %w", err)
}
buf := make([]byte, MTTresh*mtChunkCount(sz))
return &unpadReader{
src: src,

View File

@ -12,6 +12,7 @@ import (
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/filecoin-project/dagstore/mount"
ffiwrapper2 "github.com/filecoin-project/go-commp-utils/ffiwrapper"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/go-state-types/abi"
@ -384,12 +385,22 @@ func generateFakePoSt(sectorInfo []proof5.SectorInfo, rpt func(abi.RegisteredSea
}
}
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) {
if uint64(offset) != 0 {
panic("implme")
}
return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][startOffset:size])), false, nil
br := bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])
return struct {
io.ReadCloser
io.Seeker
io.ReaderAt
}{
ReadCloser: ioutil.NopCloser(br),
Seeker: br,
ReaderAt: br,
}, false, nil
}
func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) {

View File

@ -8,6 +8,7 @@ import (
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
@ -27,7 +28,7 @@ type PieceProvider interface {
// default in most cases, but this might matter with future PoRep)
// startOffset is added to the pieceOffset to get the starting reader offset.
// The number of bytes that can be read is pieceSize-startOffset
ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error)
IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
}
@ -71,37 +72,91 @@ func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storage.SectorRef
// It will NOT try to schedule an Unseal of a sealed sector file for the read.
//
// Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers.
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, pieceOffset, startOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) {
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (mount.Reader, error) {
// acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx)
if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
cancel()
return nil, nil, xerrors.Errorf("acquiring read sector lock: %w", err)
return nil, xerrors.Errorf("acquiring read sector lock: %w", err)
}
// Reader returns a reader for an unsealed piece at the given offset in the given sector.
// Reader returns a reader getter for an unsealed piece at the given offset in the given sector.
// The returned reader will be nil if none of the workers has an unsealed sector file containing
// the unsealed piece.
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()+startOffset.Padded()), size.Padded()-abi.PaddedPieceSize(startOffset.Padded()))
rg, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()), size.Padded())
if err != nil {
cancel()
log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err)
cancel()
return nil, nil, err
return nil, err
}
if r == nil {
if rg == nil {
cancel()
return nil, nil
}
return r, cancel, nil
buf := make([]byte, fr32.BufSize(size.Padded()))
pr, err := (&pieceReader{
ctx: ctx,
getReader: func(ctx context.Context, startOffset uint64) (io.ReadCloser, error) {
startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127
r, err := rg(startOffsetAligned.Padded())
if err != nil {
return nil, xerrors.Errorf("getting reader at +%d: %w", startOffsetAligned, err)
}
upr, err := fr32.NewUnpadReaderBuf(r, size.Padded(), buf)
if err != nil {
r.Close() // nolint
return nil, xerrors.Errorf("creating unpadded reader: %w", err)
}
bir := bufio.NewReaderSize(upr, 127)
if startOffset > uint64(startOffsetAligned) {
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
r.Close() // nolint
return nil, xerrors.Errorf("discarding bytes for startOffset: %w", err)
}
}
return struct {
io.Reader
io.Closer
}{
Reader: bir,
Closer: funcCloser(func() error {
return r.Close()
}),
}, nil
},
len: size,
onClose: cancel,
pieceCid: pc,
}).init()
if err != nil || pr == nil { // pr == nil to make sure we don't return typed nil
cancel()
return nil, err
}
return pr, err
}
type funcCloser func() error
func (f funcCloser) Close() error {
return f()
}
var _ io.Closer = funcCloser(nil)
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
// If an Unsealed sector file exists with the Piece Unsealed in it, we'll use that for the read.
// Otherwise, we will Unseal a Sealed sector file for the given sector and read the Unsealed piece from it.
// If we do NOT have an existing unsealed file containing the given piece thus causing us to schedule an Unseal,
// the returned boolean parameter will be set to true.
// If we have an existing unsealed file containing the given piece, the returned boolean will be set to false.
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) {
if err := pieceOffset.Valid(); err != nil {
return nil, false, xerrors.Errorf("pieceOffset is not valid: %w", err)
}
@ -109,9 +164,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
return nil, false, xerrors.Errorf("size is not a valid piece size: %w", err)
}
startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127
r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)
r, err := p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size)
log.Debugf("result of first tryReadUnsealedPiece: r=%+v, err=%s", r, err)
@ -142,7 +195,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
log.Debugf("unsealed a sector file to read the piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)
r, err = p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size)
if err != nil {
log.Errorf("failed to tryReadUnsealedPiece after SectorsUnsealPiece: %s", err)
return nil, true, xerrors.Errorf("read after unsealing: %w", err)
@ -156,34 +209,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
}
upr, err := fr32.NewUnpadReader(r, size.Padded())
if err != nil {
unlock()
return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err)
}
log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, startOffset=%d, size=%d", sector, pieceOffset, startOffset, size)
bir := bufio.NewReaderSize(upr, 127)
if startOffset > uint64(startOffsetAligned) {
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
return nil, false, xerrors.Errorf("discarding bytes for startOffset: %w", err)
}
}
return &funcCloser{
Reader: bir,
close: func() error {
err = r.Close()
unlock()
return err
},
}, uns, nil
return r, uns, nil
}
type funcCloser struct {
io.Reader
close func() error
}
func (fc *funcCloser) Close() error { return fc.close() }

View File

@ -338,7 +338,7 @@ func (p *pieceProviderTestHarness) isUnsealed(t *testing.T, offset storiface.Unp
func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize,
expectedHadToUnseal bool, expectedBytes []byte) {
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, 0, size, p.ticket, p.commD)
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD)
require.NoError(t, err)
require.NotNil(t, rd)
require.Equal(t, expectedHadToUnseal, isUnsealed)

View File

@ -1,4 +1,4 @@
package dagstore
package sectorstorage
import (
"bufio"
@ -19,11 +19,14 @@ import (
var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M
var ReadBuf = 128 * (127 * 8) // unpadded(128k)
type pieceGetter func(ctx context.Context, offset uint64) (io.ReadCloser, error)
type pieceReader struct {
ctx context.Context
api MinerAPI
pieceCid cid.Cid
len abi.UnpaddedPieceSize
ctx context.Context
getReader pieceGetter
pieceCid cid.Cid
len abi.UnpaddedPieceSize
onClose context.CancelFunc
closed bool
seqAt int64 // next byte to be read by io.Reader
@ -37,10 +40,14 @@ func (p *pieceReader) init() (_ *pieceReader, err error) {
stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1))
p.rAt = 0
p.r, p.len, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
p.r, err = p.getReader(p.ctx, uint64(p.rAt))
if err != nil {
return nil, err
}
if p.r == nil {
return nil, nil
}
p.br = bufio.NewReaderSize(p.r, ReadBuf)
return p, nil
@ -60,12 +67,19 @@ func (p *pieceReader) Close() error {
}
if p.r != nil {
if err := p.r.Close(); err != nil {
return err
}
if err := p.r.Close(); err != nil {
return err
}
p.r = nil
}
p.onClose()
p.closed = true
return nil
}
@ -127,7 +141,7 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
}
p.rAt = off
p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
p.r, err = p.getReader(p.ctx, uint64(p.rAt))
p.br = bufio.NewReaderSize(p.r, ReadBuf)
if err != nil {
return 0, xerrors.Errorf("getting backing reader: %w", err)

View File

@ -84,7 +84,6 @@ func (handler *FetchHandler) remoteStatFs(w http.ResponseWriter, r *http.Request
// remoteGetSector returns the sector file/tared directory byte stream for the sectorID and sector file type sent in the request.
// returns an error if it does NOT have the required sector file/dir.
func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Request) {
log.Infof("SERVE GET %s", r.URL)
vars := mux.Vars(r)
id, err := storiface.ParseSectorID(vars["id"])

View File

@ -585,7 +585,7 @@ func (r *Remote) CheckIsUnsealed(ctx context.Context, s storage.SectorRef, offse
// 1. no worker(local worker included) has an unsealed file for the given sector OR
// 2. no worker(local worker included) has the unsealed piece in their unsealed sector file.
// Will return a nil reader and a nil error in such a case.
func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) {
func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error), error) {
ft := storiface.FTUnsealed
// check if we have the unsealed sector file locally
@ -623,7 +623,52 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
if has {
log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size)
return r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset), size)
return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
// don't reuse between readers unless closed
f := pf
pf = nil
if f == nil {
f, err = r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
if err != nil {
return nil, xerrors.Errorf("opening partial file: %w", err)
}
log.Debugf("local partial file (re)opened %s (+%d,%d)", path, offset, size)
}
r, err := r.pfHandler.Reader(f, storiface.PaddedByteIndex(offset)+startOffsetAligned, size-abi.PaddedPieceSize(startOffsetAligned))
if err != nil {
return nil, err
}
return struct {
io.Reader
io.Closer
}{
Reader: r,
Closer: funcCloser(func() error {
// if we already have a reader cached, close this one
if pf != nil {
if f == nil {
return nil
}
if pf == f {
pf = nil
}
tmp := f
f = nil
return tmp.Close()
}
// otherwise stash it away for reuse
pf = f
return nil
}),
}, nil
}, nil
}
log.Debugf("miner has unsealed file but not unseal piece, %s (+%d,%d)", path, offset, size)
@ -666,16 +711,18 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
continue
}
// readRemote fetches a reader that we can use to read the unsealed piece from the remote worker.
// It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file.
rd, err := r.readRemote(ctx, url, offset, size)
if err != nil {
log.Warnw("reading from remote", "url", url, "error", err)
lastErr = err
continue
}
log.Infof("Read remote %s (+%d,%d)", url, offset, size)
return rd, nil
return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
// readRemote fetches a reader that we can use to read the unsealed piece from the remote worker.
// It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file.
rd, err := r.readRemote(ctx, url, offset+abi.PaddedPieceSize(startOffsetAligned), size)
if err != nil {
log.Warnw("reading from remote", "url", url, "error", err)
return nil, err
}
return rd, err
}, nil
}
}
@ -692,3 +739,11 @@ func (r *Remote) Reserve(ctx context.Context, sid storage.SectorRef, ft storifac
}
var _ Store = &Remote{}
type funcCloser func() error
func (f funcCloser) Close() error {
return f()
}
var _ io.Closer = funcCloser(nil)

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
@ -470,12 +471,20 @@ func TestReader(t *testing.T) {
remoteStore := stores.NewRemote(lstore, index, nil, 6000, pfhandler)
rd, err := remoteStore.Reader(ctx, sectorRef, offset, size)
rdg, err := remoteStore.Reader(ctx, sectorRef, offset, size)
var rd io.ReadCloser
if tc.errStr != "" {
require.Error(t, err)
require.Nil(t, rd)
require.Contains(t, err.Error(), tc.errStr)
if rdg == nil {
require.Error(t, err)
require.Nil(t, rdg)
require.Contains(t, err.Error(), tc.errStr)
} else {
rd, err = rdg(0)
require.Error(t, err)
require.Nil(t, rd)
require.Contains(t, err.Error(), tc.errStr)
}
} else {
require.NoError(t, err)
}
@ -483,7 +492,10 @@ func TestReader(t *testing.T) {
if !tc.expectedNonNilReader {
require.Nil(t, rd)
} else {
require.NotNil(t, rd)
require.NotNil(t, rdg)
rd, err := rdg(0)
require.NoError(t, err)
defer func() {
require.NoError(t, rd.Close())
}()

View File

@ -3,22 +3,22 @@ package dagstore
import (
"context"
"fmt"
"io"
"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-state-types/abi"
)
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_lotus_accessor.go -package=mock_dagstore . MinerAPI
type MinerAPI interface {
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error)
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error)
GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error)
IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error)
Start(ctx context.Context) error
@ -27,7 +27,7 @@ type MinerAPI interface {
type SectorAccessor interface {
retrievalmarket.SectorAccessor
UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error)
UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error)
}
type minerAPI struct {
@ -100,10 +100,10 @@ func (m *minerAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, erro
return false, nil
}
func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) {
func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error) {
err := m.readyMgr.AwaitReady()
if err != nil {
return nil, 0, err
return nil, err
}
// Throttle this path to avoid flooding the storage subsystem.
@ -114,11 +114,11 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
})
if err != nil {
return nil, 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
}
if len(pieceInfo.Deals) == 0 {
return nil, 0, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
return nil, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
}
// prefer an unsealed sector containing the piece if one exists
@ -126,7 +126,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
deal := deal
// Throttle this path to avoid flooding the storage subsystem.
var reader io.ReadCloser
var reader mount.Reader
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
isUnsealed, err := m.sa.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err != nil {
@ -136,7 +136,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
return nil
}
// Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing.
reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded())
reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
return err
})
@ -147,7 +147,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
if reader != nil {
// we were able to obtain a reader for an already unsealed piece
return reader, deal.Length.Unpadded(), nil
return reader, nil
}
}
@ -158,7 +158,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
// block for a long time with the current PoRep
//
// This path is unthrottled.
reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded())
reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err != nil {
lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err)
log.Warn(lastErr.Error())
@ -166,10 +166,10 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
}
// Successfully fetched the deal data so return a reader over the data
return reader, deal.Length.Unpadded(), nil
return reader, nil
}
return nil, 0, lastErr
return nil, lastErr
}
func (m *minerAPI) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {

View File

@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
@ -87,7 +88,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
}
// Fetch the piece
r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0)
r, err := api.FetchUnsealedPiece(ctx, cid1)
if tc.expectErr {
require.Error(t, err)
return
@ -159,7 +160,7 @@ func TestThrottle(t *testing.T) {
errgrp, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 10; i++ {
errgrp.Go(func() error {
r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0)
r, err := api.FetchUnsealedPiece(ctx, cid1)
if err == nil {
_ = r.Close()
}
@ -203,10 +204,10 @@ type mockRPN struct {
}
func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
return m.UnsealSectorAt(ctx, sectorID, offset, 0, length)
return m.UnsealSectorAt(ctx, sectorID, offset, length)
}
func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) {
atomic.AddInt32(&m.calls, 1)
m.lk.RLock()
defer m.lk.RUnlock()
@ -215,7 +216,13 @@ func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber,
if !ok {
panic("sector not found")
}
return io.NopCloser(bytes.NewBuffer([]byte(data[startOffset:]))), nil
return struct {
io.ReadCloser
io.ReaderAt
io.Seeker
}{
ReadCloser: io.NopCloser(bytes.NewBuffer([]byte(data[:]))),
}, nil
}
func (m *mockRPN) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {

View File

@ -6,10 +6,9 @@ package mock_dagstore
import (
context "context"
io "io"
reflect "reflect"
abi "github.com/filecoin-project/go-state-types/abi"
mount "github.com/filecoin-project/dagstore/mount"
gomock "github.com/golang/mock/gomock"
cid "github.com/ipfs/go-cid"
)
@ -38,19 +37,18 @@ func (m *MockMinerAPI) EXPECT() *MockMinerAPIMockRecorder {
}
// FetchUnsealedPiece mocks base method.
func (m *MockMinerAPI) FetchUnsealedPiece(arg0 context.Context, arg1 cid.Cid, arg2 uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) {
func (m *MockMinerAPI) FetchUnsealedPiece(arg0 context.Context, arg1 cid.Cid) (mount.Reader, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchUnsealedPiece", arg0, arg1, arg2)
ret0, _ := ret[0].(io.ReadCloser)
ret1, _ := ret[1].(abi.UnpaddedPieceSize)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
ret := m.ctrl.Call(m, "FetchUnsealedPiece", arg0, arg1)
ret0, _ := ret[0].(mount.Reader)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchUnsealedPiece indicates an expected call of FetchUnsealedPiece.
func (mr *MockMinerAPIMockRecorder) FetchUnsealedPiece(arg0, arg1, arg2 interface{}) *gomock.Call {
func (mr *MockMinerAPIMockRecorder) FetchUnsealedPiece(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockMinerAPI)(nil).FetchUnsealedPiece), arg0, arg1, arg2)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockMinerAPI)(nil).FetchUnsealedPiece), arg0, arg1)
}
// GetUnpaddedCARSize mocks base method.

View File

@ -56,11 +56,7 @@ func (l *LotusMount) Deserialize(u *url.URL) error {
}
func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) {
return (&pieceReader{
ctx: ctx,
api: l.API,
pieceCid: l.PieceCid,
}).init()
return l.API.FetchUnsealedPiece(ctx, l.PieceCid)
}
func (l *LotusMount) Info() mount.Info {

View File

@ -2,6 +2,7 @@ package dagstore
import (
"context"
"io"
"io/ioutil"
"net/url"
"strings"
@ -12,8 +13,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-state-types/abi"
mock_dagstore "github.com/filecoin-project/lotus/markets/dagstore/mocks"
)
@ -30,8 +29,28 @@ func TestLotusMount(t *testing.T) {
mockLotusMountAPI := mock_dagstore.NewMockMinerAPI(mockCtrl)
mockLotusMountAPI.EXPECT().IsUnsealed(gomock.Any(), cid).Return(true, nil).Times(1)
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1)
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1)
mr1 := struct {
io.ReadCloser
io.ReaderAt
io.Seeker
}{
ReadCloser: ioutil.NopCloser(strings.NewReader("testing")),
ReaderAt: nil,
Seeker: nil,
}
mr2 := struct {
io.ReadCloser
io.ReaderAt
io.Seeker
}{
ReadCloser: ioutil.NopCloser(strings.NewReader("testing")),
ReaderAt: nil,
Seeker: nil,
}
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(mr1, nil).Times(1)
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(mr2, nil).Times(1)
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1)
mnt, err := NewLotusMount(cid, mockLotusMountAPI)

View File

@ -6,9 +6,9 @@ import (
"testing"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
@ -127,17 +127,20 @@ type wrappedSA struct {
retrievalmarket.SectorAccessor
}
func (w *wrappedSA) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
func (w *wrappedSA) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) {
r, err := w.UnsealSector(ctx, sectorID, pieceOffset, length)
if err != nil {
return nil, err
}
if startOffset > 0 {
if _, err := io.CopyN(io.Discard, r, int64(startOffset)); err != nil {
return nil, xerrors.Errorf("discard start off: %w", err)
}
}
return r, err
return struct {
io.ReadCloser
io.Seeker
io.ReaderAt
}{
ReadCloser: r,
Seeker: nil,
ReaderAt: nil,
}, err
}
var _ SectorAccessor = &wrappedSA{}

View File

@ -3,7 +3,6 @@ package dagstore
import (
"bytes"
"context"
"io"
"os"
"testing"
"time"
@ -12,8 +11,6 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
@ -192,7 +189,7 @@ func (m mockLotusMount) Start(ctx context.Context) error {
return nil
}
func (m mockLotusMount) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) {
func (m mockLotusMount) FetchUnsealedPiece(context.Context, cid.Cid) (mount.Reader, error) {
panic("implement me")
}

View File

@ -4,8 +4,16 @@ import (
"context"
"io"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-state-types/abi"
specstorage "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/chain/types"
@ -14,14 +22,6 @@ import (
"github.com/filecoin-project/lotus/markets/dagstore"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-state-types/abi"
specstorage "github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
)
var log = logging.Logger("sectoraccessor")
@ -40,10 +40,10 @@ func NewSectorAccessor(maddr dtypes.MinerAddress, secb sectorblocks.SectorBuilde
}
func (sa *sectorAccessor) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
return sa.UnsealSectorAt(ctx, sectorID, pieceOffset, 0, length)
return sa.UnsealSectorAt(ctx, sectorID, pieceOffset, length)
}
func (sa *sectorAccessor) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
func (sa *sectorAccessor) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) {
log.Debugf("get sector %d, pieceOffset %d, length %d", sectorID, pieceOffset, length)
si, err := sa.sectorsStatus(ctx, sectorID, false)
if err != nil {
@ -69,8 +69,8 @@ func (sa *sectorAccessor) UnsealSectorAt(ctx context.Context, sectorID abi.Secto
}
// Get a reader for the piece, unsealing the piece if necessary
log.Debugf("read piece in sector %d, pieceOffset %d, startOffset %d, length %d from miner %d", sectorID, pieceOffset, startOffset, length, mid)
r, unsealed, err := sa.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(pieceOffset), startOffset, length, si.Ticket.Value, commD)
log.Debugf("read piece in sector %d, pieceOffset %d, length %d from miner %d", sectorID, pieceOffset, length, mid)
r, unsealed, err := sa.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(pieceOffset), length, si.Ticket.Value, commD)
if err != nil {
return nil, xerrors.Errorf("failed to unseal piece from sector %d: %w", sectorID, err)
}