piecereader: Move closer to storage

This commit is contained in:
Łukasz Magiera 2021-12-08 23:16:27 +01:00 committed by Jennifer Wang
parent 09a5c2e025
commit b21d3ded2f
12 changed files with 167 additions and 122 deletions

View File

@ -12,6 +12,7 @@ import (
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof" proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/filecoin-project/dagstore/mount"
ffiwrapper2 "github.com/filecoin-project/go-commp-utils/ffiwrapper" ffiwrapper2 "github.com/filecoin-project/go-commp-utils/ffiwrapper"
commcid "github.com/filecoin-project/go-fil-commcid" commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
@ -384,12 +385,20 @@ func generateFakePoSt(sectorInfo []proof5.SectorInfo, rpt func(abi.RegisteredSea
} }
} }
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) { func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) {
if uint64(offset) != 0 { if uint64(offset) != 0 {
panic("implme") panic("implme")
} }
return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][startOffset:size])), false, nil return struct {
io.ReadCloser
io.Seeker
io.ReaderAt
}{
ReadCloser: ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])),
Seeker: nil,
ReaderAt: nil,
}, false, nil
} }
func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) { func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) {

View File

@ -8,6 +8,7 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
@ -27,7 +28,7 @@ type PieceProvider interface {
// default in most cases, but this might matter with future PoRep) // default in most cases, but this might matter with future PoRep)
// startOffset is added to the pieceOffset to get the starting reader offset. // startOffset is added to the pieceOffset to get the starting reader offset.
// The number of bytes that can be read is pieceSize-startOffset // The number of bytes that can be read is pieceSize-startOffset
ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error)
IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
} }
@ -71,28 +72,62 @@ func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storage.SectorRef
// It will NOT try to schedule an Unseal of a sealed sector file for the read. // It will NOT try to schedule an Unseal of a sealed sector file for the read.
// //
// Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers. // Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers.
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, pieceOffset, startOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) { func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (mount.Reader, error) {
// acquire a lock purely for reading unsealed sectors // acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil { if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
cancel() cancel()
return nil, nil, xerrors.Errorf("acquiring read sector lock: %w", err) return nil, xerrors.Errorf("acquiring read sector lock: %w", err)
} }
// Reader returns a reader for an unsealed piece at the given offset in the given sector. pr, err := (&pieceReader{
// The returned reader will be nil if none of the workers has an unsealed sector file containing ctx: ctx,
// the unsealed piece. getReader: func(ctx context.Context, startOffset uint64) (io.ReadCloser, error) {
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()+startOffset.Padded()), size.Padded()-abi.PaddedPieceSize(startOffset.Padded())) startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127
if err != nil {
log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err) // Reader returns a reader for an unsealed piece at the given offset in the given sector.
cancel() // The returned reader will be nil if none of the workers has an unsealed sector file containing
return nil, nil, err // the unsealed piece.
} r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()+startOffsetAligned.Padded()), size.Padded()-abi.PaddedPieceSize(startOffsetAligned.Padded()))
if r == nil { if err != nil {
log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err)
return nil, err
}
if r == nil {
return nil, nil
}
upr, err := fr32.NewUnpadReader(r, size.Padded())
if err != nil {
r.Close() // nolint
return nil, xerrors.Errorf("creating unpadded reader: %w", err)
}
bir := bufio.NewReaderSize(upr, 127)
if startOffset > uint64(startOffsetAligned) {
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
return nil, xerrors.Errorf("discarding bytes for startOffset: %w", err)
}
}
return struct {
io.Reader
io.Closer
}{
Reader: bir,
Closer: r,
}, nil
},
len: size,
onClose: cancel,
pieceCid: pc,
}).init()
if err != nil || pr == nil { // pr == nil to make sure we don't return typed nil
cancel() cancel()
return nil, err
} }
return r, cancel, nil return pr, err
} }
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector // ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
@ -101,7 +136,7 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage
// If we do NOT have an existing unsealed file containing the given piece thus causing us to schedule an Unseal, // If we do NOT have an existing unsealed file containing the given piece thus causing us to schedule an Unseal,
// the returned boolean parameter will be set to true. // the returned boolean parameter will be set to true.
// If we have an existing unsealed file containing the given piece, the returned boolean will be set to false. // If we have an existing unsealed file containing the given piece, the returned boolean will be set to false.
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) { func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) {
if err := pieceOffset.Valid(); err != nil { if err := pieceOffset.Valid(); err != nil {
return nil, false, xerrors.Errorf("pieceOffset is not valid: %w", err) return nil, false, xerrors.Errorf("pieceOffset is not valid: %w", err)
} }
@ -109,9 +144,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
return nil, false, xerrors.Errorf("size is not a valid piece size: %w", err) return nil, false, xerrors.Errorf("size is not a valid piece size: %w", err)
} }
startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127 r, err := p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size)
r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)
log.Debugf("result of first tryReadUnsealedPiece: r=%+v, err=%s", r, err) log.Debugf("result of first tryReadUnsealedPiece: r=%+v, err=%s", r, err)
@ -142,7 +175,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
log.Debugf("unsealed a sector file to read the piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size) log.Debugf("unsealed a sector file to read the piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size) r, err = p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size)
if err != nil { if err != nil {
log.Errorf("failed to tryReadUnsealedPiece after SectorsUnsealPiece: %s", err) log.Errorf("failed to tryReadUnsealedPiece after SectorsUnsealPiece: %s", err)
return nil, true, xerrors.Errorf("read after unsealing: %w", err) return nil, true, xerrors.Errorf("read after unsealing: %w", err)
@ -156,34 +189,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size) log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
} }
upr, err := fr32.NewUnpadReader(r, size.Padded()) log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
if err != nil {
unlock()
return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err)
}
log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, startOffset=%d, size=%d", sector, pieceOffset, startOffset, size) return r, uns, nil
bir := bufio.NewReaderSize(upr, 127)
if startOffset > uint64(startOffsetAligned) {
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
return nil, false, xerrors.Errorf("discarding bytes for startOffset: %w", err)
}
}
return &funcCloser{
Reader: bir,
close: func() error {
err = r.Close()
unlock()
return err
},
}, uns, nil
} }
type funcCloser struct {
io.Reader
close func() error
}
func (fc *funcCloser) Close() error { return fc.close() }

View File

@ -338,7 +338,7 @@ func (p *pieceProviderTestHarness) isUnsealed(t *testing.T, offset storiface.Unp
func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize,
expectedHadToUnseal bool, expectedBytes []byte) { expectedHadToUnseal bool, expectedBytes []byte) {
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, 0, size, p.ticket, p.commD) rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, rd) require.NotNil(t, rd)
require.Equal(t, expectedHadToUnseal, isUnsealed) require.Equal(t, expectedHadToUnseal, isUnsealed)

View File

@ -1,4 +1,4 @@
package dagstore package sectorstorage
import ( import (
"bufio" "bufio"
@ -19,11 +19,14 @@ import (
var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M
var ReadBuf = 128 * (127 * 8) // unpadded(128k) var ReadBuf = 128 * (127 * 8) // unpadded(128k)
type pieceGetter func(ctx context.Context, offset uint64) (io.ReadCloser, error)
type pieceReader struct { type pieceReader struct {
ctx context.Context ctx context.Context
api MinerAPI getReader pieceGetter
pieceCid cid.Cid pieceCid cid.Cid
len abi.UnpaddedPieceSize len abi.UnpaddedPieceSize
onClose context.CancelFunc
closed bool closed bool
seqAt int64 // next byte to be read by io.Reader seqAt int64 // next byte to be read by io.Reader
@ -37,10 +40,14 @@ func (p *pieceReader) init() (_ *pieceReader, err error) {
stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1)) stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1))
p.rAt = 0 p.rAt = 0
p.r, p.len, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt)) p.r, err = p.getReader(p.ctx, uint64(p.rAt))
if err != nil { if err != nil {
return nil, err return nil, err
} }
if p.r == nil {
return nil, nil
}
p.br = bufio.NewReaderSize(p.r, ReadBuf) p.br = bufio.NewReaderSize(p.r, ReadBuf)
return p, nil return p, nil
@ -66,6 +73,10 @@ func (p *pieceReader) Close() error {
p.r = nil p.r = nil
} }
p.onClose()
p.closed = true
return nil return nil
} }
@ -127,7 +138,7 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
} }
p.rAt = off p.rAt = off
p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt)) p.r, err = p.getReader(p.ctx, uint64(p.rAt))
p.br = bufio.NewReaderSize(p.r, ReadBuf) p.br = bufio.NewReaderSize(p.r, ReadBuf)
if err != nil { if err != nil {
return 0, xerrors.Errorf("getting backing reader: %w", err) return 0, xerrors.Errorf("getting backing reader: %w", err)

View File

@ -3,22 +3,21 @@ package dagstore
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-state-types/abi"
) )
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_lotus_accessor.go -package=mock_dagstore . MinerAPI //go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_lotus_accessor.go -package=mock_dagstore . MinerAPI
type MinerAPI interface { type MinerAPI interface {
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error)
GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error)
IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error)
Start(ctx context.Context) error Start(ctx context.Context) error
@ -27,7 +26,7 @@ type MinerAPI interface {
type SectorAccessor interface { type SectorAccessor interface {
retrievalmarket.SectorAccessor retrievalmarket.SectorAccessor
UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error)
} }
type minerAPI struct { type minerAPI struct {
@ -100,10 +99,10 @@ func (m *minerAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, erro
return false, nil return false, nil
} }
func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) { func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error) {
err := m.readyMgr.AwaitReady() err := m.readyMgr.AwaitReady()
if err != nil { if err != nil {
return nil, 0, err return nil, err
} }
// Throttle this path to avoid flooding the storage subsystem. // Throttle this path to avoid flooding the storage subsystem.
@ -114,11 +113,11 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
}) })
if err != nil { if err != nil {
return nil, 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err) return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
} }
if len(pieceInfo.Deals) == 0 { if len(pieceInfo.Deals) == 0 {
return nil, 0, xerrors.Errorf("no storage deals found for piece %s", pieceCid) return nil, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
} }
// prefer an unsealed sector containing the piece if one exists // prefer an unsealed sector containing the piece if one exists
@ -126,7 +125,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
deal := deal deal := deal
// Throttle this path to avoid flooding the storage subsystem. // Throttle this path to avoid flooding the storage subsystem.
var reader io.ReadCloser var reader mount.Reader
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) { err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
isUnsealed, err := m.sa.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) isUnsealed, err := m.sa.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err != nil { if err != nil {
@ -136,7 +135,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
return nil return nil
} }
// Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing. // Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing.
reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded()) reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
return err return err
}) })
@ -147,7 +146,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
if reader != nil { if reader != nil {
// we were able to obtain a reader for an already unsealed piece // we were able to obtain a reader for an already unsealed piece
return reader, deal.Length.Unpadded(), nil return reader, nil
} }
} }
@ -158,7 +157,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
// block for a long time with the current PoRep // block for a long time with the current PoRep
// //
// This path is unthrottled. // This path is unthrottled.
reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded()) reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err != nil { if err != nil {
lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err) lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err)
log.Warn(lastErr.Error()) log.Warn(lastErr.Error())
@ -166,10 +165,10 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
} }
// Successfully fetched the deal data so return a reader over the data // Successfully fetched the deal data so return a reader over the data
return reader, deal.Length.Unpadded(), nil return reader, nil
} }
return nil, 0, lastErr return nil, lastErr
} }
func (m *minerAPI) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) { func (m *minerAPI) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {

View File

@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/paych" "github.com/filecoin-project/specs-actors/actors/builtin/paych"
@ -87,7 +88,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
} }
// Fetch the piece // Fetch the piece
r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0) r, err := api.FetchUnsealedPiece(ctx, cid1)
if tc.expectErr { if tc.expectErr {
require.Error(t, err) require.Error(t, err)
return return
@ -159,7 +160,7 @@ func TestThrottle(t *testing.T) {
errgrp, ctx := errgroup.WithContext(context.Background()) errgrp, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
errgrp.Go(func() error { errgrp.Go(func() error {
r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0) r, err := api.FetchUnsealedPiece(ctx, cid1)
if err == nil { if err == nil {
_ = r.Close() _ = r.Close()
} }
@ -203,10 +204,10 @@ type mockRPN struct {
} }
func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
return m.UnsealSectorAt(ctx, sectorID, offset, 0, length) return m.UnsealSectorAt(ctx, sectorID, offset, length)
} }
func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) {
atomic.AddInt32(&m.calls, 1) atomic.AddInt32(&m.calls, 1)
m.lk.RLock() m.lk.RLock()
defer m.lk.RUnlock() defer m.lk.RUnlock()
@ -215,7 +216,13 @@ func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber,
if !ok { if !ok {
panic("sector not found") panic("sector not found")
} }
return io.NopCloser(bytes.NewBuffer([]byte(data[startOffset:]))), nil return struct {
io.ReadCloser
io.ReaderAt
io.Seeker
}{
ReadCloser: io.NopCloser(bytes.NewBuffer([]byte(data[:]))),
}, nil
} }
func (m *mockRPN) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) { func (m *mockRPN) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {

View File

@ -6,10 +6,9 @@ package mock_dagstore
import ( import (
context "context" context "context"
io "io"
reflect "reflect" reflect "reflect"
abi "github.com/filecoin-project/go-state-types/abi" mount "github.com/filecoin-project/dagstore/mount"
gomock "github.com/golang/mock/gomock" gomock "github.com/golang/mock/gomock"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
) )
@ -38,19 +37,18 @@ func (m *MockMinerAPI) EXPECT() *MockMinerAPIMockRecorder {
} }
// FetchUnsealedPiece mocks base method. // FetchUnsealedPiece mocks base method.
func (m *MockMinerAPI) FetchUnsealedPiece(arg0 context.Context, arg1 cid.Cid, arg2 uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) { func (m *MockMinerAPI) FetchUnsealedPiece(arg0 context.Context, arg1 cid.Cid) (mount.Reader, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchUnsealedPiece", arg0, arg1, arg2) ret := m.ctrl.Call(m, "FetchUnsealedPiece", arg0, arg1)
ret0, _ := ret[0].(io.ReadCloser) ret0, _ := ret[0].(mount.Reader)
ret1, _ := ret[1].(abi.UnpaddedPieceSize) ret1, _ := ret[1].(error)
ret2, _ := ret[2].(error) return ret0, ret1
return ret0, ret1, ret2
} }
// FetchUnsealedPiece indicates an expected call of FetchUnsealedPiece. // FetchUnsealedPiece indicates an expected call of FetchUnsealedPiece.
func (mr *MockMinerAPIMockRecorder) FetchUnsealedPiece(arg0, arg1, arg2 interface{}) *gomock.Call { func (mr *MockMinerAPIMockRecorder) FetchUnsealedPiece(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockMinerAPI)(nil).FetchUnsealedPiece), arg0, arg1, arg2) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockMinerAPI)(nil).FetchUnsealedPiece), arg0, arg1)
} }
// GetUnpaddedCARSize mocks base method. // GetUnpaddedCARSize mocks base method.

View File

@ -56,11 +56,7 @@ func (l *LotusMount) Deserialize(u *url.URL) error {
} }
func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) { func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) {
return (&pieceReader{ return l.API.FetchUnsealedPiece(ctx, l.PieceCid)
ctx: ctx,
api: l.API,
pieceCid: l.PieceCid,
}).init()
} }
func (l *LotusMount) Info() mount.Info { func (l *LotusMount) Info() mount.Info {

View File

@ -2,6 +2,7 @@ package dagstore
import ( import (
"context" "context"
"io"
"io/ioutil" "io/ioutil"
"net/url" "net/url"
"strings" "strings"
@ -12,8 +13,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-state-types/abi"
mock_dagstore "github.com/filecoin-project/lotus/markets/dagstore/mocks" mock_dagstore "github.com/filecoin-project/lotus/markets/dagstore/mocks"
) )
@ -30,8 +29,28 @@ func TestLotusMount(t *testing.T) {
mockLotusMountAPI := mock_dagstore.NewMockMinerAPI(mockCtrl) mockLotusMountAPI := mock_dagstore.NewMockMinerAPI(mockCtrl)
mockLotusMountAPI.EXPECT().IsUnsealed(gomock.Any(), cid).Return(true, nil).Times(1) mockLotusMountAPI.EXPECT().IsUnsealed(gomock.Any(), cid).Return(true, nil).Times(1)
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1)
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1) mr1 := struct {
io.ReadCloser
io.ReaderAt
io.Seeker
}{
ReadCloser: ioutil.NopCloser(strings.NewReader("testing")),
ReaderAt: nil,
Seeker: nil,
}
mr2 := struct {
io.ReadCloser
io.ReaderAt
io.Seeker
}{
ReadCloser: ioutil.NopCloser(strings.NewReader("testing")),
ReaderAt: nil,
Seeker: nil,
}
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(mr1, nil).Times(1)
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(mr2, nil).Times(1)
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1) mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1)
mnt, err := NewLotusMount(cid, mockLotusMountAPI) mnt, err := NewLotusMount(cid, mockLotusMountAPI)

View File

@ -6,9 +6,9 @@ import (
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore" "github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket"
@ -127,17 +127,20 @@ type wrappedSA struct {
retrievalmarket.SectorAccessor retrievalmarket.SectorAccessor
} }
func (w *wrappedSA) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { func (w *wrappedSA) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) {
r, err := w.UnsealSector(ctx, sectorID, pieceOffset, length) r, err := w.UnsealSector(ctx, sectorID, pieceOffset, length)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if startOffset > 0 { return struct {
if _, err := io.CopyN(io.Discard, r, int64(startOffset)); err != nil { io.ReadCloser
return nil, xerrors.Errorf("discard start off: %w", err) io.Seeker
} io.ReaderAt
} }{
return r, err ReadCloser: r,
Seeker: nil,
ReaderAt: nil,
}, err
} }
var _ SectorAccessor = &wrappedSA{} var _ SectorAccessor = &wrappedSA{}

View File

@ -3,7 +3,6 @@ package dagstore
import ( import (
"bytes" "bytes"
"context" "context"
"io"
"os" "os"
"testing" "testing"
"time" "time"
@ -12,8 +11,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/dagstore" "github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard" "github.com/filecoin-project/dagstore/shard"
@ -192,7 +189,7 @@ func (m mockLotusMount) Start(ctx context.Context) error {
return nil return nil
} }
func (m mockLotusMount) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) { func (m mockLotusMount) FetchUnsealedPiece(context.Context, cid.Cid) (mount.Reader, error) {
panic("implement me") panic("implement me")
} }

View File

@ -4,8 +4,16 @@ import (
"context" "context"
"io" "io"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-state-types/abi"
specstorage "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v1api" "github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
@ -14,14 +22,6 @@ import (
"github.com/filecoin-project/lotus/markets/dagstore" "github.com/filecoin-project/lotus/markets/dagstore"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-state-types/abi"
specstorage "github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
) )
var log = logging.Logger("sectoraccessor") var log = logging.Logger("sectoraccessor")
@ -40,10 +40,10 @@ func NewSectorAccessor(maddr dtypes.MinerAddress, secb sectorblocks.SectorBuilde
} }
func (sa *sectorAccessor) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { func (sa *sectorAccessor) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
return sa.UnsealSectorAt(ctx, sectorID, pieceOffset, 0, length) return sa.UnsealSectorAt(ctx, sectorID, pieceOffset, length)
} }
func (sa *sectorAccessor) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { func (sa *sectorAccessor) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) {
log.Debugf("get sector %d, pieceOffset %d, length %d", sectorID, pieceOffset, length) log.Debugf("get sector %d, pieceOffset %d, length %d", sectorID, pieceOffset, length)
si, err := sa.sectorsStatus(ctx, sectorID, false) si, err := sa.sectorsStatus(ctx, sectorID, false)
if err != nil { if err != nil {
@ -69,8 +69,8 @@ func (sa *sectorAccessor) UnsealSectorAt(ctx context.Context, sectorID abi.Secto
} }
// Get a reader for the piece, unsealing the piece if necessary // Get a reader for the piece, unsealing the piece if necessary
log.Debugf("read piece in sector %d, pieceOffset %d, startOffset %d, length %d from miner %d", sectorID, pieceOffset, startOffset, length, mid) log.Debugf("read piece in sector %d, pieceOffset %d, length %d from miner %d", sectorID, pieceOffset, length, mid)
r, unsealed, err := sa.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(pieceOffset), startOffset, length, si.Ticket.Value, commD) r, unsealed, err := sa.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(pieceOffset), length, si.Ticket.Value, commD)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to unseal piece from sector %d: %w", sectorID, err) return nil, xerrors.Errorf("failed to unseal piece from sector %d: %w", sectorID, err)
} }