Merge pull request #7693 from filecoin-project/feat/randacc-dagstore-mount
Make small retrieval 200x faster
This commit is contained in:
commit
26c3752f48
6
extern/sector-storage/mock/mock.go
vendored
6
extern/sector-storage/mock/mock.go
vendored
@ -384,12 +384,12 @@ func generateFakePoSt(sectorInfo []proof5.SectorInfo, rpt func(abi.RegisteredSea
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
|
||||
if offset != 0 {
|
||||
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
|
||||
if uint64(offset) != 0 {
|
||||
panic("implme")
|
||||
}
|
||||
|
||||
return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])), false, nil
|
||||
return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][startOffset:size])), false, nil
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) {
|
||||
|
43
extern/sector-storage/piece_provider.go
vendored
43
extern/sector-storage/piece_provider.go
vendored
@ -23,7 +23,11 @@ type Unsealer interface {
|
||||
|
||||
type PieceProvider interface {
|
||||
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
|
||||
ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
|
||||
// pieceOffset + pieceSize specify piece bounds for unsealing (note: with SDR the entire sector will be unsealed by
|
||||
// default in most cases, but this might matter with future PoRep)
|
||||
// startOffset is added to the pieceOffset to get the starting reader offset.
|
||||
// The number of bytes that can be read is pieceSize-startOffset
|
||||
ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
|
||||
IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
|
||||
}
|
||||
|
||||
@ -67,7 +71,7 @@ func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storage.SectorRef
|
||||
// It will NOT try to schedule an Unseal of a sealed sector file for the read.
|
||||
//
|
||||
// Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers.
|
||||
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) {
|
||||
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, pieceOffset, startOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) {
|
||||
// acquire a lock purely for reading unsealed sectors
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
|
||||
@ -78,7 +82,7 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage
|
||||
// Reader returns a reader for an unsealed piece at the given offset in the given sector.
|
||||
// The returned reader will be nil if none of the workers has an unsealed sector file containing
|
||||
// the unsealed piece.
|
||||
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded())
|
||||
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()+startOffset.Padded()), size.Padded()-abi.PaddedPieceSize(startOffset.Padded()))
|
||||
if err != nil {
|
||||
log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err)
|
||||
cancel()
|
||||
@ -97,20 +101,22 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage
|
||||
// If we do NOT have an existing unsealed file containing the given piece thus causing us to schedule an Unseal,
|
||||
// the returned boolean parameter will be set to true.
|
||||
// If we have an existing unsealed file containing the given piece, the returned boolean will be set to false.
|
||||
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
|
||||
if err := offset.Valid(); err != nil {
|
||||
return nil, false, xerrors.Errorf("offset is not valid: %w", err)
|
||||
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
|
||||
if err := pieceOffset.Valid(); err != nil {
|
||||
return nil, false, xerrors.Errorf("pieceOffset is not valid: %w", err)
|
||||
}
|
||||
if err := size.Validate(); err != nil {
|
||||
return nil, false, xerrors.Errorf("size is not a valid piece size: %w", err)
|
||||
}
|
||||
|
||||
r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, offset, size)
|
||||
startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127
|
||||
|
||||
r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)
|
||||
|
||||
log.Debugf("result of first tryReadUnsealedPiece: r=%+v, err=%s", r, err)
|
||||
|
||||
if xerrors.Is(err, storiface.ErrSectorNotFound) {
|
||||
log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
|
||||
log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
|
||||
err = nil
|
||||
}
|
||||
if err != nil {
|
||||
@ -129,14 +135,14 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
|
||||
if unsealed == cid.Undef {
|
||||
commd = nil
|
||||
}
|
||||
if err := p.uns.SectorsUnsealPiece(ctx, sector, offset, size, ticket, commd); err != nil {
|
||||
if err := p.uns.SectorsUnsealPiece(ctx, sector, pieceOffset, size, ticket, commd); err != nil {
|
||||
log.Errorf("failed to SectorsUnsealPiece: %s", err)
|
||||
return nil, false, xerrors.Errorf("unsealing piece: %w", err)
|
||||
}
|
||||
|
||||
log.Debugf("unsealed a sector file to read the piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
|
||||
log.Debugf("unsealed a sector file to read the piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
|
||||
|
||||
r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, offset, size)
|
||||
r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)
|
||||
if err != nil {
|
||||
log.Errorf("failed to tryReadUnsealedPiece after SectorsUnsealPiece: %s", err)
|
||||
return nil, true, xerrors.Errorf("read after unsealing: %w", err)
|
||||
@ -145,9 +151,9 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
|
||||
log.Errorf("got no reader after unsealing piece")
|
||||
return nil, true, xerrors.Errorf("got no reader after unsealing piece")
|
||||
}
|
||||
log.Debugf("got a reader to read unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
|
||||
log.Debugf("got a reader to read unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
|
||||
} else {
|
||||
log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, offset=%d, size=%d", sector, offset, size)
|
||||
log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
|
||||
}
|
||||
|
||||
upr, err := fr32.NewUnpadReader(r, size.Padded())
|
||||
@ -156,10 +162,17 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
|
||||
return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err)
|
||||
}
|
||||
|
||||
log.Debugf("returning reader to read unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
|
||||
log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, startOffset=%d, size=%d", sector, pieceOffset, startOffset, size)
|
||||
|
||||
bir := bufio.NewReaderSize(upr, 127)
|
||||
if startOffset > uint64(startOffsetAligned) {
|
||||
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
|
||||
return nil, false, xerrors.Errorf("discarding bytes for startOffset: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return &funcCloser{
|
||||
Reader: bufio.NewReaderSize(upr, 127),
|
||||
Reader: bir,
|
||||
close: func() error {
|
||||
err = r.Close()
|
||||
unlock()
|
||||
|
2
extern/sector-storage/piece_provider_test.go
vendored
2
extern/sector-storage/piece_provider_test.go
vendored
@ -337,7 +337,7 @@ func (p *pieceProviderTestHarness) isUnsealed(t *testing.T, offset storiface.Unp
|
||||
|
||||
func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize,
|
||||
expectedHadToUnseal bool, expectedBytes []byte) {
|
||||
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD)
|
||||
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, 0, size, p.ticket, p.commD)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, rd)
|
||||
require.Equal(t, expectedHadToUnseal, isUnsealed)
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"io"
|
||||
|
||||
"github.com/filecoin-project/dagstore/throttle"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
@ -14,23 +15,31 @@ import (
|
||||
"github.com/filecoin-project/go-fil-markets/shared"
|
||||
)
|
||||
|
||||
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_lotus_accessor.go -package=mock_dagstore . MinerAPI
|
||||
|
||||
type MinerAPI interface {
|
||||
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
|
||||
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error)
|
||||
GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error)
|
||||
IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error)
|
||||
Start(ctx context.Context) error
|
||||
}
|
||||
|
||||
type SectorAccessor interface {
|
||||
retrievalmarket.SectorAccessor
|
||||
|
||||
UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error)
|
||||
}
|
||||
|
||||
type minerAPI struct {
|
||||
pieceStore piecestore.PieceStore
|
||||
sa retrievalmarket.SectorAccessor
|
||||
sa SectorAccessor
|
||||
throttle throttle.Throttler
|
||||
readyMgr *shared.ReadyManager
|
||||
}
|
||||
|
||||
var _ MinerAPI = (*minerAPI)(nil)
|
||||
|
||||
func NewMinerAPI(store piecestore.PieceStore, sa retrievalmarket.SectorAccessor, concurrency int) MinerAPI {
|
||||
func NewMinerAPI(store piecestore.PieceStore, sa SectorAccessor, concurrency int) MinerAPI {
|
||||
return &minerAPI{
|
||||
pieceStore: store,
|
||||
sa: sa,
|
||||
@ -91,10 +100,10 @@ func (m *minerAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, erro
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
|
||||
func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) {
|
||||
err := m.readyMgr.AwaitReady()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// Throttle this path to avoid flooding the storage subsystem.
|
||||
@ -105,11 +114,11 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
|
||||
return nil, 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
|
||||
}
|
||||
|
||||
if len(pieceInfo.Deals) == 0 {
|
||||
return nil, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
|
||||
return nil, 0, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
|
||||
}
|
||||
|
||||
// prefer an unsealed sector containing the piece if one exists
|
||||
@ -127,7 +136,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io
|
||||
return nil
|
||||
}
|
||||
// Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing.
|
||||
reader, err = m.sa.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
|
||||
reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded())
|
||||
return err
|
||||
})
|
||||
|
||||
@ -138,7 +147,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io
|
||||
|
||||
if reader != nil {
|
||||
// we were able to obtain a reader for an already unsealed piece
|
||||
return reader, nil
|
||||
return reader, deal.Length.Unpadded(), nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -149,7 +158,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io
|
||||
// block for a long time with the current PoRep
|
||||
//
|
||||
// This path is unthrottled.
|
||||
reader, err := m.sa.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
|
||||
reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded())
|
||||
if err != nil {
|
||||
lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err)
|
||||
log.Warn(lastErr.Error())
|
||||
@ -157,10 +166,10 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io
|
||||
}
|
||||
|
||||
// Successfully fetched the deal data so return a reader over the data
|
||||
return reader, nil
|
||||
return reader, deal.Length.Unpadded(), nil
|
||||
}
|
||||
|
||||
return nil, lastErr
|
||||
return nil, 0, lastErr
|
||||
}
|
||||
|
||||
func (m *minerAPI) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
|
||||
|
@ -87,7 +87,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
|
||||
}
|
||||
|
||||
// Fetch the piece
|
||||
r, err := api.FetchUnsealedPiece(ctx, cid1)
|
||||
r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0)
|
||||
if tc.expectErr {
|
||||
require.Error(t, err)
|
||||
return
|
||||
@ -159,7 +159,7 @@ func TestThrottle(t *testing.T) {
|
||||
errgrp, ctx := errgroup.WithContext(context.Background())
|
||||
for i := 0; i < 10; i++ {
|
||||
errgrp.Go(func() error {
|
||||
r, err := api.FetchUnsealedPiece(ctx, cid1)
|
||||
r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0)
|
||||
if err == nil {
|
||||
_ = r.Close()
|
||||
}
|
||||
@ -203,6 +203,10 @@ type mockRPN struct {
|
||||
}
|
||||
|
||||
func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
|
||||
return m.UnsealSectorAt(ctx, sectorID, offset, 0, length)
|
||||
}
|
||||
|
||||
func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
|
||||
atomic.AddInt32(&m.calls, 1)
|
||||
m.lk.RLock()
|
||||
defer m.lk.RUnlock()
|
||||
@ -211,7 +215,7 @@ func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, o
|
||||
if !ok {
|
||||
panic("sector not found")
|
||||
}
|
||||
return io.NopCloser(bytes.NewBuffer([]byte(data))), nil
|
||||
return io.NopCloser(bytes.NewBuffer([]byte(data[startOffset:]))), nil
|
||||
}
|
||||
|
||||
func (m *mockRPN) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {
|
||||
|
@ -1,5 +1,5 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: lotusaccessor.go
|
||||
// Source: github.com/filecoin-project/lotus/markets/dagstore (interfaces: MinerAPI)
|
||||
|
||||
// Package mock_dagstore is a generated GoMock package.
|
||||
package mock_dagstore
|
||||
@ -9,88 +9,90 @@ import (
|
||||
io "io"
|
||||
reflect "reflect"
|
||||
|
||||
abi "github.com/filecoin-project/go-state-types/abi"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
// MockLotusAccessor is a mock of LotusAccessor interface.
|
||||
type MockLotusAccessor struct {
|
||||
// MockMinerAPI is a mock of MinerAPI interface.
|
||||
type MockMinerAPI struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockLotusAccessorMockRecorder
|
||||
recorder *MockMinerAPIMockRecorder
|
||||
}
|
||||
|
||||
// MockLotusAccessorMockRecorder is the mock recorder for MockLotusAccessor.
|
||||
type MockLotusAccessorMockRecorder struct {
|
||||
mock *MockLotusAccessor
|
||||
// MockMinerAPIMockRecorder is the mock recorder for MockMinerAPI.
|
||||
type MockMinerAPIMockRecorder struct {
|
||||
mock *MockMinerAPI
|
||||
}
|
||||
|
||||
// NewMockLotusAccessor creates a new mock instance.
|
||||
func NewMockLotusAccessor(ctrl *gomock.Controller) *MockLotusAccessor {
|
||||
mock := &MockLotusAccessor{ctrl: ctrl}
|
||||
mock.recorder = &MockLotusAccessorMockRecorder{mock}
|
||||
// NewMockMinerAPI creates a new mock instance.
|
||||
func NewMockMinerAPI(ctrl *gomock.Controller) *MockMinerAPI {
|
||||
mock := &MockMinerAPI{ctrl: ctrl}
|
||||
mock.recorder = &MockMinerAPIMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockLotusAccessor) EXPECT() *MockLotusAccessorMockRecorder {
|
||||
func (m *MockMinerAPI) EXPECT() *MockMinerAPIMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// FetchUnsealedPiece mocks base method.
|
||||
func (m *MockLotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
|
||||
func (m *MockMinerAPI) FetchUnsealedPiece(arg0 context.Context, arg1 cid.Cid, arg2 uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "FetchUnsealedPiece", ctx, pieceCid)
|
||||
ret := m.ctrl.Call(m, "FetchUnsealedPiece", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].(io.ReadCloser)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
ret1, _ := ret[1].(abi.UnpaddedPieceSize)
|
||||
ret2, _ := ret[2].(error)
|
||||
return ret0, ret1, ret2
|
||||
}
|
||||
|
||||
// FetchUnsealedPiece indicates an expected call of FetchUnsealedPiece.
|
||||
func (mr *MockLotusAccessorMockRecorder) FetchUnsealedPiece(ctx, pieceCid interface{}) *gomock.Call {
|
||||
func (mr *MockMinerAPIMockRecorder) FetchUnsealedPiece(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockLotusAccessor)(nil).FetchUnsealedPiece), ctx, pieceCid)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockMinerAPI)(nil).FetchUnsealedPiece), arg0, arg1, arg2)
|
||||
}
|
||||
|
||||
// GetUnpaddedCARSize mocks base method.
|
||||
func (m *MockLotusAccessor) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
|
||||
func (m *MockMinerAPI) GetUnpaddedCARSize(arg0 context.Context, arg1 cid.Cid) (uint64, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetUnpaddedCARSize", ctx, pieceCid)
|
||||
ret := m.ctrl.Call(m, "GetUnpaddedCARSize", arg0, arg1)
|
||||
ret0, _ := ret[0].(uint64)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// GetUnpaddedCARSize indicates an expected call of GetUnpaddedCARSize.
|
||||
func (mr *MockLotusAccessorMockRecorder) GetUnpaddedCARSize(ctx, pieceCid interface{}) *gomock.Call {
|
||||
func (mr *MockMinerAPIMockRecorder) GetUnpaddedCARSize(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUnpaddedCARSize", reflect.TypeOf((*MockLotusAccessor)(nil).GetUnpaddedCARSize), ctx, pieceCid)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUnpaddedCARSize", reflect.TypeOf((*MockMinerAPI)(nil).GetUnpaddedCARSize), arg0, arg1)
|
||||
}
|
||||
|
||||
// IsUnsealed mocks base method.
|
||||
func (m *MockLotusAccessor) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
|
||||
func (m *MockMinerAPI) IsUnsealed(arg0 context.Context, arg1 cid.Cid) (bool, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "IsUnsealed", ctx, pieceCid)
|
||||
ret := m.ctrl.Call(m, "IsUnsealed", arg0, arg1)
|
||||
ret0, _ := ret[0].(bool)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// IsUnsealed indicates an expected call of IsUnsealed.
|
||||
func (mr *MockLotusAccessorMockRecorder) IsUnsealed(ctx, pieceCid interface{}) *gomock.Call {
|
||||
func (mr *MockMinerAPIMockRecorder) IsUnsealed(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnsealed", reflect.TypeOf((*MockLotusAccessor)(nil).IsUnsealed), ctx, pieceCid)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnsealed", reflect.TypeOf((*MockMinerAPI)(nil).IsUnsealed), arg0, arg1)
|
||||
}
|
||||
|
||||
// Start mocks base method.
|
||||
func (m *MockLotusAccessor) Start(ctx context.Context) error {
|
||||
func (m *MockMinerAPI) Start(arg0 context.Context) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Start", ctx)
|
||||
ret := m.ctrl.Call(m, "Start", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Start indicates an expected call of Start.
|
||||
func (mr *MockLotusAccessorMockRecorder) Start(ctx interface{}) *gomock.Call {
|
||||
func (mr *MockMinerAPIMockRecorder) Start(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockLotusAccessor)(nil).Start), ctx)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockMinerAPI)(nil).Start), arg0)
|
||||
}
|
||||
|
@ -2,7 +2,6 @@ package dagstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/url"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
@ -57,19 +56,19 @@ func (l *LotusMount) Deserialize(u *url.URL) error {
|
||||
}
|
||||
|
||||
func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) {
|
||||
r, err := l.API.FetchUnsealedPiece(ctx, l.PieceCid)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to fetch unsealed piece %s: %w", l.PieceCid, err)
|
||||
}
|
||||
return &readCloser{r}, nil
|
||||
return (&pieceReader{
|
||||
ctx: ctx,
|
||||
api: l.API,
|
||||
pieceCid: l.PieceCid,
|
||||
}).init()
|
||||
}
|
||||
|
||||
func (l *LotusMount) Info() mount.Info {
|
||||
return mount.Info{
|
||||
Kind: mount.KindRemote,
|
||||
AccessSequential: true,
|
||||
AccessSeek: false,
|
||||
AccessRandom: false,
|
||||
AccessSeek: true,
|
||||
AccessRandom: true,
|
||||
}
|
||||
}
|
||||
|
||||
@ -94,17 +93,3 @@ func (l *LotusMount) Stat(ctx context.Context) (mount.Stat, error) {
|
||||
Ready: isUnsealed,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type readCloser struct {
|
||||
io.ReadCloser
|
||||
}
|
||||
|
||||
var _ mount.Reader = (*readCloser)(nil)
|
||||
|
||||
func (r *readCloser) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
return 0, xerrors.Errorf("ReadAt called but not implemented")
|
||||
}
|
||||
|
||||
func (r *readCloser) Seek(offset int64, whence int) (int64, error) {
|
||||
return 0, xerrors.Errorf("Seek called but not implemented")
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/dagstore/mount"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
mock_dagstore "github.com/filecoin-project/lotus/markets/dagstore/mocks"
|
||||
)
|
||||
@ -26,12 +27,12 @@ func TestLotusMount(t *testing.T) {
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
// create a mock lotus api that returns the reader we want
|
||||
mockLotusMountAPI := mock_dagstore.NewMockLotusAccessor(mockCtrl)
|
||||
mockLotusMountAPI := mock_dagstore.NewMockMinerAPI(mockCtrl)
|
||||
|
||||
mockLotusMountAPI.EXPECT().IsUnsealed(gomock.Any(), cid).Return(true, nil).Times(1)
|
||||
|
||||
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1)
|
||||
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1)
|
||||
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1)
|
||||
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1)
|
||||
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1)
|
||||
|
||||
mnt, err := NewLotusMount(cid, mockLotusMountAPI)
|
||||
@ -109,7 +110,7 @@ func TestLotusMountRegistration(t *testing.T) {
|
||||
// when test is done, assert expectations on all mock objects.
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
mockLotusMountAPI := mock_dagstore.NewMockLotusAccessor(mockCtrl)
|
||||
mockLotusMountAPI := mock_dagstore.NewMockMinerAPI(mockCtrl)
|
||||
registry := mount.NewRegistry()
|
||||
err = registry.Register(lotusScheme, mountTemplate(mockLotusMountAPI))
|
||||
require.NoError(t, err)
|
||||
|
139
markets/dagstore/piecereader.go
Normal file
139
markets/dagstore/piecereader.go
Normal file
@ -0,0 +1,139 @@
|
||||
package dagstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/dagstore/mount"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
)
|
||||
|
||||
// For small read skips, it's faster to "burn" some bytes than to setup new sector reader.
|
||||
// Assuming 1ms stream seek latency, and 1G/s stream rate, we're willing to discard up to 1 MiB.
|
||||
var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M
|
||||
|
||||
type pieceReader struct {
|
||||
ctx context.Context
|
||||
api MinerAPI
|
||||
pieceCid cid.Cid
|
||||
len abi.UnpaddedPieceSize
|
||||
|
||||
closed bool
|
||||
seqAt int64 // next byte to be read by io.Reader
|
||||
|
||||
r io.ReadCloser
|
||||
rAt int64
|
||||
}
|
||||
|
||||
func (p *pieceReader) init() (_ *pieceReader, err error) {
|
||||
p.rAt = 0
|
||||
p.r, p.len, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *pieceReader) check() error {
|
||||
if p.closed {
|
||||
return xerrors.Errorf("reader closed")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *pieceReader) Close() error {
|
||||
if err := p.check(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if p.r != nil {
|
||||
if err := p.r.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
p.r = nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *pieceReader) Read(b []byte) (int, error) {
|
||||
if err := p.check(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
n, err := p.ReadAt(b, p.seqAt)
|
||||
p.seqAt += int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (p *pieceReader) Seek(offset int64, whence int) (int64, error) {
|
||||
if err := p.check(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
switch whence {
|
||||
case io.SeekStart:
|
||||
p.seqAt = offset
|
||||
case io.SeekCurrent:
|
||||
p.seqAt += offset
|
||||
case io.SeekEnd:
|
||||
p.seqAt = int64(p.len) + offset
|
||||
default:
|
||||
return 0, xerrors.Errorf("bad whence")
|
||||
}
|
||||
|
||||
return p.seqAt, nil
|
||||
}
|
||||
|
||||
func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
|
||||
if err := p.check(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// 1. Get the backing reader into the correct position
|
||||
|
||||
// if the backing reader is ahead of the offset we want, or more than
|
||||
// MaxPieceReaderBurnBytes behind, reset the reader
|
||||
if p.r == nil || p.rAt > off || p.rAt+MaxPieceReaderBurnBytes < off {
|
||||
if p.r != nil {
|
||||
if err := p.r.Close(); err != nil {
|
||||
return 0, xerrors.Errorf("closing backing reader: %w", err)
|
||||
}
|
||||
p.r = nil
|
||||
}
|
||||
|
||||
log.Debugw("pieceReader new stream", "piece", p.pieceCid, "at", p.rAt, "off", off-p.rAt)
|
||||
|
||||
p.rAt = off
|
||||
p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
|
||||
if err != nil {
|
||||
return 0, xerrors.Errorf("getting backing reader: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Check if we need to burn some bytes
|
||||
if off > p.rAt {
|
||||
n, err := io.CopyN(io.Discard, p.r, off-p.rAt)
|
||||
p.rAt += n
|
||||
if err != nil {
|
||||
return 0, xerrors.Errorf("discarding read gap: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Sanity check
|
||||
if off != p.rAt {
|
||||
return 0, xerrors.Errorf("bad reader offset; requested %d; at %d", off, p.rAt)
|
||||
}
|
||||
|
||||
// 4. Read!
|
||||
n, err = p.r.Read(b)
|
||||
p.rAt += int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
var _ mount.Reader = (*pieceReader)(nil)
|
@ -2,13 +2,16 @@ package dagstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/filecoin-project/dagstore"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/dagstore"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/testnodes"
|
||||
tut "github.com/filecoin-project/go-fil-markets/shared_testutil"
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||
@ -93,7 +96,7 @@ func TestShardRegistration(t *testing.T) {
|
||||
cfg := config.DefaultStorageMiner().DAGStore
|
||||
cfg.RootDir = t.TempDir()
|
||||
|
||||
mapi := NewMinerAPI(ps, sa, 10)
|
||||
mapi := NewMinerAPI(ps, &wrappedSA{sa}, 10)
|
||||
dagst, w, err := NewDAGStore(cfg, mapi)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, dagst)
|
||||
@ -119,3 +122,22 @@ func TestShardRegistration(t *testing.T) {
|
||||
|
||||
// ps.VerifyExpectations(t)
|
||||
}
|
||||
|
||||
type wrappedSA struct {
|
||||
retrievalmarket.SectorAccessor
|
||||
}
|
||||
|
||||
func (w *wrappedSA) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
|
||||
r, err := w.UnsealSector(ctx, sectorID, pieceOffset, length)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if startOffset > 0 {
|
||||
if _, err := io.CopyN(io.Discard, r, int64(startOffset)); err != nil {
|
||||
return nil, xerrors.Errorf("discard start off: %w", err)
|
||||
}
|
||||
}
|
||||
return r, err
|
||||
}
|
||||
|
||||
var _ SectorAccessor = &wrappedSA{}
|
||||
|
@ -8,16 +8,17 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/dagstore"
|
||||
"github.com/filecoin-project/dagstore/mount"
|
||||
"github.com/filecoin-project/dagstore/shard"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
)
|
||||
|
||||
// TestWrapperAcquireRecovery verifies that if acquire shard returns a "not found"
|
||||
@ -191,7 +192,7 @@ func (m mockLotusMount) Start(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m mockLotusMount) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
|
||||
func (m mockLotusMount) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||
"github.com/filecoin-project/lotus/markets/dagstore"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
|
||||
@ -34,12 +35,16 @@ type sectorAccessor struct {
|
||||
|
||||
var _ retrievalmarket.SectorAccessor = (*sectorAccessor)(nil)
|
||||
|
||||
func NewSectorAccessor(maddr dtypes.MinerAddress, secb sectorblocks.SectorBuilder, pp sectorstorage.PieceProvider, full v1api.FullNode) retrievalmarket.SectorAccessor {
|
||||
func NewSectorAccessor(maddr dtypes.MinerAddress, secb sectorblocks.SectorBuilder, pp sectorstorage.PieceProvider, full v1api.FullNode) dagstore.SectorAccessor {
|
||||
return §orAccessor{address.Address(maddr), secb, pp, full}
|
||||
}
|
||||
|
||||
func (sa *sectorAccessor) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
|
||||
log.Debugf("get sector %d, offset %d, length %d", sectorID, offset, length)
|
||||
func (sa *sectorAccessor) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
|
||||
return sa.UnsealSectorAt(ctx, sectorID, pieceOffset, 0, length)
|
||||
}
|
||||
|
||||
func (sa *sectorAccessor) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
|
||||
log.Debugf("get sector %d, pieceOffset %d, length %d", sectorID, pieceOffset, length)
|
||||
si, err := sa.sectorsStatus(ctx, sectorID, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -64,8 +69,8 @@ func (sa *sectorAccessor) UnsealSector(ctx context.Context, sectorID abi.SectorN
|
||||
}
|
||||
|
||||
// Get a reader for the piece, unsealing the piece if necessary
|
||||
log.Debugf("read piece in sector %d, offset %d, length %d from miner %d", sectorID, offset, length, mid)
|
||||
r, unsealed, err := sa.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(offset), length, si.Ticket.Value, commD)
|
||||
log.Debugf("read piece in sector %d, pieceOffset %d, startOffset %d, length %d from miner %d", sectorID, pieceOffset, startOffset, length, mid)
|
||||
r, unsealed, err := sa.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(pieceOffset), startOffset, length, si.Ticket.Value, commD)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to unseal piece from sector %d: %w", sectorID, err)
|
||||
}
|
||||
|
@ -155,7 +155,8 @@ func ConfigStorageMiner(c interface{}) Option {
|
||||
Override(DAGStoreKey, modules.DAGStore),
|
||||
|
||||
// Markets (retrieval)
|
||||
Override(new(retrievalmarket.SectorAccessor), sectoraccessor.NewSectorAccessor),
|
||||
Override(new(dagstore.SectorAccessor), sectoraccessor.NewSectorAccessor),
|
||||
Override(new(retrievalmarket.SectorAccessor), From(new(dagstore.SectorAccessor))),
|
||||
Override(new(retrievalmarket.RetrievalProviderNode), retrievaladapter.NewRetrievalProviderNode),
|
||||
Override(new(rmnet.RetrievalMarketNetwork), modules.RetrievalNetwork),
|
||||
Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider),
|
||||
|
@ -11,8 +11,6 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/dagstore"
|
||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||
|
||||
mdagstore "github.com/filecoin-project/lotus/markets/dagstore"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
@ -25,7 +23,7 @@ const (
|
||||
)
|
||||
|
||||
// NewMinerAPI creates a new MinerAPI adaptor for the dagstore mounts.
|
||||
func NewMinerAPI(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor) (mdagstore.MinerAPI, error) {
|
||||
func NewMinerAPI(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, sa mdagstore.SectorAccessor) (mdagstore.MinerAPI, error) {
|
||||
cfg, err := extractDAGStoreConfig(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
Loading…
Reference in New Issue
Block a user