dagstore mount: Add random access support

This commit is contained in:
Łukasz Magiera 2021-11-26 17:40:43 +01:00
parent 3b1d86b750
commit 8d955d5f30
13 changed files with 275 additions and 99 deletions

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"fmt" "fmt"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"io" "io"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
@ -384,8 +385,8 @@ func generateFakePoSt(sectorInfo []proof5.SectorInfo, rpt func(abi.RegisteredSea
} }
} }
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) { func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
if offset != 0 { if uint64(offset)+startOffset != 0 {
panic("implme") panic("implme")
} }
@ -625,3 +626,4 @@ var MockProver = MockVerifier
var _ storage.Sealer = &SectorMgr{} var _ storage.Sealer = &SectorMgr{}
var _ ffiwrapper.Verifier = MockVerifier var _ ffiwrapper.Verifier = MockVerifier
var _ ffiwrapper.Prover = MockProver var _ ffiwrapper.Prover = MockProver
var _ sectorstorage.PieceProvider = &SectorMgr{}

View File

@ -23,7 +23,11 @@ type Unsealer interface {
type PieceProvider interface { type PieceProvider interface {
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector // ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) // pieceOffset + pieceSize specify piece bounds for unsealing (note: with SDR the entire sector will be unsealed by
// default in most cases, but this might matter with future PoRep)
// startOffset is added to the pieceOffset to get the starting reader offset.
// The number of bytes that can be read is pieceSize-startOffset
ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
} }
@ -67,7 +71,7 @@ func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storage.SectorRef
// It will NOT try to schedule an Unseal of a sealed sector file for the read. // It will NOT try to schedule an Unseal of a sealed sector file for the read.
// //
// Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers. // Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers.
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) { func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, pieceOffset, startOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) {
// acquire a lock purely for reading unsealed sectors // acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil { if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
@ -78,7 +82,7 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage
// Reader returns a reader for an unsealed piece at the given offset in the given sector. // Reader returns a reader for an unsealed piece at the given offset in the given sector.
// The returned reader will be nil if none of the workers has an unsealed sector file containing // The returned reader will be nil if none of the workers has an unsealed sector file containing
// the unsealed piece. // the unsealed piece.
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded()) r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()+startOffset.Padded()), size.Padded())
if err != nil { if err != nil {
log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err) log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err)
cancel() cancel()
@ -97,20 +101,22 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage
// If we do NOT have an existing unsealed file containing the given piece thus causing us to schedule an Unseal, // If we do NOT have an existing unsealed file containing the given piece thus causing us to schedule an Unseal,
// the returned boolean parameter will be set to true. // the returned boolean parameter will be set to true.
// If we have an existing unsealed file containing the given piece, the returned boolean will be set to false. // If we have an existing unsealed file containing the given piece, the returned boolean will be set to false.
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) { func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
if err := offset.Valid(); err != nil { if err := pieceOffset.Valid(); err != nil {
return nil, false, xerrors.Errorf("offset is not valid: %w", err) return nil, false, xerrors.Errorf("pieceOffset is not valid: %w", err)
} }
if err := size.Validate(); err != nil { if err := size.Validate(); err != nil {
return nil, false, xerrors.Errorf("size is not a valid piece size: %w", err) return nil, false, xerrors.Errorf("size is not a valid piece size: %w", err)
} }
r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, offset, size) startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127
r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)
log.Debugf("result of first tryReadUnsealedPiece: r=%+v, err=%s", r, err) log.Debugf("result of first tryReadUnsealedPiece: r=%+v, err=%s", r, err)
if xerrors.Is(err, storiface.ErrSectorNotFound) { if xerrors.Is(err, storiface.ErrSectorNotFound) {
log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size) log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
err = nil err = nil
} }
if err != nil { if err != nil {
@ -129,14 +135,14 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
if unsealed == cid.Undef { if unsealed == cid.Undef {
commd = nil commd = nil
} }
if err := p.uns.SectorsUnsealPiece(ctx, sector, offset, size, ticket, commd); err != nil { if err := p.uns.SectorsUnsealPiece(ctx, sector, pieceOffset, size, ticket, commd); err != nil {
log.Errorf("failed to SectorsUnsealPiece: %s", err) log.Errorf("failed to SectorsUnsealPiece: %s", err)
return nil, false, xerrors.Errorf("unsealing piece: %w", err) return nil, false, xerrors.Errorf("unsealing piece: %w", err)
} }
log.Debugf("unsealed a sector file to read the piece, sector=%+v, offset=%d, size=%d", sector, offset, size) log.Debugf("unsealed a sector file to read the piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, offset, size) r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)
if err != nil { if err != nil {
log.Errorf("failed to tryReadUnsealedPiece after SectorsUnsealPiece: %s", err) log.Errorf("failed to tryReadUnsealedPiece after SectorsUnsealPiece: %s", err)
return nil, true, xerrors.Errorf("read after unsealing: %w", err) return nil, true, xerrors.Errorf("read after unsealing: %w", err)
@ -145,9 +151,9 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
log.Errorf("got no reader after unsealing piece") log.Errorf("got no reader after unsealing piece")
return nil, true, xerrors.Errorf("got no reader after unsealing piece") return nil, true, xerrors.Errorf("got no reader after unsealing piece")
} }
log.Debugf("got a reader to read unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size) log.Debugf("got a reader to read unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
} else { } else {
log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, offset=%d, size=%d", sector, offset, size) log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
} }
upr, err := fr32.NewUnpadReader(r, size.Padded()) upr, err := fr32.NewUnpadReader(r, size.Padded())
@ -156,10 +162,17 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err) return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err)
} }
log.Debugf("returning reader to read unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size) log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, startOffset=%d, size=%d", sector, pieceOffset, startOffset, size)
bir := bufio.NewReaderSize(upr, 127)
if startOffset > uint64(startOffsetAligned) {
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
return nil, false, xerrors.Errorf("discarding bytes for startOffset: %w", err)
}
}
return &funcCloser{ return &funcCloser{
Reader: bufio.NewReaderSize(upr, 127), Reader: bir,
close: func() error { close: func() error {
err = r.Close() err = r.Close()
unlock() unlock()

View File

@ -6,6 +6,7 @@ import (
"io" "io"
"github.com/filecoin-project/dagstore/throttle" "github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -14,23 +15,31 @@ import (
"github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/shared"
) )
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_lotus_accessor.go -package=mock_dagstore . MinerAPI
type MinerAPI interface { type MinerAPI interface {
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error)
GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error)
IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error)
Start(ctx context.Context) error Start(ctx context.Context) error
} }
type SectorAccessor interface {
retrievalmarket.SectorAccessor
UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error)
}
type minerAPI struct { type minerAPI struct {
pieceStore piecestore.PieceStore pieceStore piecestore.PieceStore
sa retrievalmarket.SectorAccessor sa SectorAccessor
throttle throttle.Throttler throttle throttle.Throttler
readyMgr *shared.ReadyManager readyMgr *shared.ReadyManager
} }
var _ MinerAPI = (*minerAPI)(nil) var _ MinerAPI = (*minerAPI)(nil)
func NewMinerAPI(store piecestore.PieceStore, sa retrievalmarket.SectorAccessor, concurrency int) MinerAPI { func NewMinerAPI(store piecestore.PieceStore, sa SectorAccessor, concurrency int) MinerAPI {
return &minerAPI{ return &minerAPI{
pieceStore: store, pieceStore: store,
sa: sa, sa: sa,
@ -91,10 +100,10 @@ func (m *minerAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, erro
return false, nil return false, nil
} }
func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) {
err := m.readyMgr.AwaitReady() err := m.readyMgr.AwaitReady()
if err != nil { if err != nil {
return nil, err return nil, 0, err
} }
// Throttle this path to avoid flooding the storage subsystem. // Throttle this path to avoid flooding the storage subsystem.
@ -105,11 +114,11 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io
}) })
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err) return nil, 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
} }
if len(pieceInfo.Deals) == 0 { if len(pieceInfo.Deals) == 0 {
return nil, xerrors.Errorf("no storage deals found for piece %s", pieceCid) return nil, 0, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
} }
// prefer an unsealed sector containing the piece if one exists // prefer an unsealed sector containing the piece if one exists
@ -127,7 +136,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io
return nil return nil
} }
// Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing. // Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing.
reader, err = m.sa.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded())
return err return err
}) })
@ -138,7 +147,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io
if reader != nil { if reader != nil {
// we were able to obtain a reader for an already unsealed piece // we were able to obtain a reader for an already unsealed piece
return reader, nil return reader, deal.Length.Unpadded(), nil
} }
} }
@ -149,7 +158,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io
// block for a long time with the current PoRep // block for a long time with the current PoRep
// //
// This path is unthrottled. // This path is unthrottled.
reader, err := m.sa.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded())
if err != nil { if err != nil {
lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err) lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err)
log.Warn(lastErr.Error()) log.Warn(lastErr.Error())
@ -157,10 +166,10 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io
} }
// Successfully fetched the deal data so return a reader over the data // Successfully fetched the deal data so return a reader over the data
return reader, nil return reader, deal.Length.Unpadded(), nil
} }
return nil, lastErr return nil, 0, lastErr
} }
func (m *minerAPI) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) { func (m *minerAPI) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {

View File

@ -87,7 +87,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
} }
// Fetch the piece // Fetch the piece
r, err := api.FetchUnsealedPiece(ctx, cid1) r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0)
if tc.expectErr { if tc.expectErr {
require.Error(t, err) require.Error(t, err)
return return
@ -159,7 +159,7 @@ func TestThrottle(t *testing.T) {
errgrp, ctx := errgroup.WithContext(context.Background()) errgrp, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
errgrp.Go(func() error { errgrp.Go(func() error {
r, err := api.FetchUnsealedPiece(ctx, cid1) r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0)
if err == nil { if err == nil {
_ = r.Close() _ = r.Close()
} }
@ -203,6 +203,10 @@ type mockRPN struct {
} }
func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
return m.UnsealSectorAt(ctx, sectorID, offset, 0, length)
}
func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
atomic.AddInt32(&m.calls, 1) atomic.AddInt32(&m.calls, 1)
m.lk.RLock() m.lk.RLock()
defer m.lk.RUnlock() defer m.lk.RUnlock()
@ -211,7 +215,7 @@ func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, o
if !ok { if !ok {
panic("sector not found") panic("sector not found")
} }
return io.NopCloser(bytes.NewBuffer([]byte(data))), nil return io.NopCloser(bytes.NewBuffer([]byte(data[startOffset:]))), nil
} }
func (m *mockRPN) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) { func (m *mockRPN) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {

View File

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT. // Code generated by MockGen. DO NOT EDIT.
// Source: lotusaccessor.go // Source: github.com/filecoin-project/lotus/markets/dagstore (interfaces: MinerAPI)
// Package mock_dagstore is a generated GoMock package. // Package mock_dagstore is a generated GoMock package.
package mock_dagstore package mock_dagstore
@ -9,88 +9,90 @@ import (
io "io" io "io"
reflect "reflect" reflect "reflect"
abi "github.com/filecoin-project/go-state-types/abi"
gomock "github.com/golang/mock/gomock" gomock "github.com/golang/mock/gomock"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
) )
// MockLotusAccessor is a mock of LotusAccessor interface. // MockMinerAPI is a mock of MinerAPI interface.
type MockLotusAccessor struct { type MockMinerAPI struct {
ctrl *gomock.Controller ctrl *gomock.Controller
recorder *MockLotusAccessorMockRecorder recorder *MockMinerAPIMockRecorder
} }
// MockLotusAccessorMockRecorder is the mock recorder for MockLotusAccessor. // MockMinerAPIMockRecorder is the mock recorder for MockMinerAPI.
type MockLotusAccessorMockRecorder struct { type MockMinerAPIMockRecorder struct {
mock *MockLotusAccessor mock *MockMinerAPI
} }
// NewMockLotusAccessor creates a new mock instance. // NewMockMinerAPI creates a new mock instance.
func NewMockLotusAccessor(ctrl *gomock.Controller) *MockLotusAccessor { func NewMockMinerAPI(ctrl *gomock.Controller) *MockMinerAPI {
mock := &MockLotusAccessor{ctrl: ctrl} mock := &MockMinerAPI{ctrl: ctrl}
mock.recorder = &MockLotusAccessorMockRecorder{mock} mock.recorder = &MockMinerAPIMockRecorder{mock}
return mock return mock
} }
// EXPECT returns an object that allows the caller to indicate expected use. // EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockLotusAccessor) EXPECT() *MockLotusAccessorMockRecorder { func (m *MockMinerAPI) EXPECT() *MockMinerAPIMockRecorder {
return m.recorder return m.recorder
} }
// FetchUnsealedPiece mocks base method. // FetchUnsealedPiece mocks base method.
func (m *MockLotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { func (m *MockMinerAPI) FetchUnsealedPiece(arg0 context.Context, arg1 cid.Cid, arg2 uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchUnsealedPiece", ctx, pieceCid) ret := m.ctrl.Call(m, "FetchUnsealedPiece", arg0, arg1, arg2)
ret0, _ := ret[0].(io.ReadCloser) ret0, _ := ret[0].(io.ReadCloser)
ret1, _ := ret[1].(error) ret1, _ := ret[1].(abi.UnpaddedPieceSize)
return ret0, ret1 ret2, _ := ret[2].(error)
return ret0, ret1, ret2
} }
// FetchUnsealedPiece indicates an expected call of FetchUnsealedPiece. // FetchUnsealedPiece indicates an expected call of FetchUnsealedPiece.
func (mr *MockLotusAccessorMockRecorder) FetchUnsealedPiece(ctx, pieceCid interface{}) *gomock.Call { func (mr *MockMinerAPIMockRecorder) FetchUnsealedPiece(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockLotusAccessor)(nil).FetchUnsealedPiece), ctx, pieceCid) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockMinerAPI)(nil).FetchUnsealedPiece), arg0, arg1, arg2)
} }
// GetUnpaddedCARSize mocks base method. // GetUnpaddedCARSize mocks base method.
func (m *MockLotusAccessor) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) { func (m *MockMinerAPI) GetUnpaddedCARSize(arg0 context.Context, arg1 cid.Cid) (uint64, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetUnpaddedCARSize", ctx, pieceCid) ret := m.ctrl.Call(m, "GetUnpaddedCARSize", arg0, arg1)
ret0, _ := ret[0].(uint64) ret0, _ := ret[0].(uint64)
ret1, _ := ret[1].(error) ret1, _ := ret[1].(error)
return ret0, ret1 return ret0, ret1
} }
// GetUnpaddedCARSize indicates an expected call of GetUnpaddedCARSize. // GetUnpaddedCARSize indicates an expected call of GetUnpaddedCARSize.
func (mr *MockLotusAccessorMockRecorder) GetUnpaddedCARSize(ctx, pieceCid interface{}) *gomock.Call { func (mr *MockMinerAPIMockRecorder) GetUnpaddedCARSize(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUnpaddedCARSize", reflect.TypeOf((*MockLotusAccessor)(nil).GetUnpaddedCARSize), ctx, pieceCid) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUnpaddedCARSize", reflect.TypeOf((*MockMinerAPI)(nil).GetUnpaddedCARSize), arg0, arg1)
} }
// IsUnsealed mocks base method. // IsUnsealed mocks base method.
func (m *MockLotusAccessor) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) { func (m *MockMinerAPI) IsUnsealed(arg0 context.Context, arg1 cid.Cid) (bool, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "IsUnsealed", ctx, pieceCid) ret := m.ctrl.Call(m, "IsUnsealed", arg0, arg1)
ret0, _ := ret[0].(bool) ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(error) ret1, _ := ret[1].(error)
return ret0, ret1 return ret0, ret1
} }
// IsUnsealed indicates an expected call of IsUnsealed. // IsUnsealed indicates an expected call of IsUnsealed.
func (mr *MockLotusAccessorMockRecorder) IsUnsealed(ctx, pieceCid interface{}) *gomock.Call { func (mr *MockMinerAPIMockRecorder) IsUnsealed(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnsealed", reflect.TypeOf((*MockLotusAccessor)(nil).IsUnsealed), ctx, pieceCid) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnsealed", reflect.TypeOf((*MockMinerAPI)(nil).IsUnsealed), arg0, arg1)
} }
// Start mocks base method. // Start mocks base method.
func (m *MockLotusAccessor) Start(ctx context.Context) error { func (m *MockMinerAPI) Start(arg0 context.Context) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Start", ctx) ret := m.ctrl.Call(m, "Start", arg0)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
return ret0 return ret0
} }
// Start indicates an expected call of Start. // Start indicates an expected call of Start.
func (mr *MockLotusAccessorMockRecorder) Start(ctx interface{}) *gomock.Call { func (mr *MockMinerAPIMockRecorder) Start(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockLotusAccessor)(nil).Start), ctx) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockMinerAPI)(nil).Start), arg0)
} }

View File

@ -2,7 +2,6 @@ package dagstore
import ( import (
"context" "context"
"io"
"net/url" "net/url"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -57,19 +56,19 @@ func (l *LotusMount) Deserialize(u *url.URL) error {
} }
func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) { func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) {
r, err := l.API.FetchUnsealedPiece(ctx, l.PieceCid) return (&pieceReader{
if err != nil { ctx: ctx,
return nil, xerrors.Errorf("failed to fetch unsealed piece %s: %w", l.PieceCid, err) api: l.API,
} pieceCid: l.PieceCid,
return &readCloser{r}, nil }).init()
} }
func (l *LotusMount) Info() mount.Info { func (l *LotusMount) Info() mount.Info {
return mount.Info{ return mount.Info{
Kind: mount.KindRemote, Kind: mount.KindRemote,
AccessSequential: true, AccessSequential: true,
AccessSeek: false, AccessSeek: true,
AccessRandom: false, AccessRandom: true,
} }
} }
@ -94,17 +93,3 @@ func (l *LotusMount) Stat(ctx context.Context) (mount.Stat, error) {
Ready: isUnsealed, Ready: isUnsealed,
}, nil }, nil
} }
type readCloser struct {
io.ReadCloser
}
var _ mount.Reader = (*readCloser)(nil)
func (r *readCloser) ReadAt(p []byte, off int64) (n int, err error) {
return 0, xerrors.Errorf("ReadAt called but not implemented")
}
func (r *readCloser) Seek(offset int64, whence int) (int64, error) {
return 0, xerrors.Errorf("Seek called but not implemented")
}

View File

@ -2,6 +2,7 @@ package dagstore
import ( import (
"context" "context"
"github.com/filecoin-project/go-state-types/abi"
"io/ioutil" "io/ioutil"
"net/url" "net/url"
"strings" "strings"
@ -26,12 +27,12 @@ func TestLotusMount(t *testing.T) {
defer mockCtrl.Finish() defer mockCtrl.Finish()
// create a mock lotus api that returns the reader we want // create a mock lotus api that returns the reader we want
mockLotusMountAPI := mock_dagstore.NewMockLotusAccessor(mockCtrl) mockLotusMountAPI := mock_dagstore.NewMockMinerAPI(mockCtrl)
mockLotusMountAPI.EXPECT().IsUnsealed(gomock.Any(), cid).Return(true, nil).Times(1) mockLotusMountAPI.EXPECT().IsUnsealed(gomock.Any(), cid).Return(true, nil).Times(1)
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1) mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1)
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1) mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1)
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1) mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1)
mnt, err := NewLotusMount(cid, mockLotusMountAPI) mnt, err := NewLotusMount(cid, mockLotusMountAPI)
@ -109,7 +110,7 @@ func TestLotusMountRegistration(t *testing.T) {
// when test is done, assert expectations on all mock objects. // when test is done, assert expectations on all mock objects.
defer mockCtrl.Finish() defer mockCtrl.Finish()
mockLotusMountAPI := mock_dagstore.NewMockLotusAccessor(mockCtrl) mockLotusMountAPI := mock_dagstore.NewMockMinerAPI(mockCtrl)
registry := mount.NewRegistry() registry := mount.NewRegistry()
err = registry.Register(lotusScheme, mountTemplate(mockLotusMountAPI)) err = registry.Register(lotusScheme, mountTemplate(mockLotusMountAPI))
require.NoError(t, err) require.NoError(t, err)

View File

@ -0,0 +1,133 @@
package dagstore
import (
"context"
"io"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-state-types/abi"
)
// for small read skips, it's faster to "burn" some bytes than to setup new
// sector reader
var MaxPieceReaderBurnBytes int64 = 512 << 10 // 512k
type pieceReader struct {
ctx context.Context
api MinerAPI
pieceCid cid.Cid
len abi.UnpaddedPieceSize
closed bool
seqAt int64 // next byte to be read by io.Reader
r io.ReadCloser
rAt int64
}
func (p *pieceReader) init() (_ *pieceReader, err error) {
p.rAt = 0
p.r, p.len, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
if err != nil {
return nil, err
}
return p, nil
}
func (p *pieceReader) check() error {
if p.closed {
return xerrors.Errorf("reader closed")
}
return nil
}
func (p *pieceReader) Close() error {
if err := p.check(); err != nil {
return err
}
if p.r != nil {
if err := p.r.Close(); err != nil {
return err
}
p.r = nil
}
return nil
}
func (p *pieceReader) Read(b []byte) (int, error) {
if err := p.check(); err != nil {
return 0, err
}
n, err := p.ReadAt(b, p.seqAt)
p.seqAt += int64(n)
return n, err
}
func (p *pieceReader) Seek(offset int64, whence int) (int64, error) {
if err := p.check(); err != nil {
return 0, err
}
switch whence {
case io.SeekStart:
p.seqAt = offset
case io.SeekCurrent:
p.seqAt += offset
case io.SeekEnd:
p.seqAt = int64(p.len) + offset
default:
return 0, xerrors.Errorf("bad whence")
}
return p.seqAt, nil
}
func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
if err := p.check(); err != nil {
return 0, err
}
// get the backing reader into the correct position
if p.r == nil {
p.rAt = MaxPieceReaderBurnBytes * -2
}
// if the backing reader is ahead of the offset we want, or more than
// MaxPieceReaderBurnBytes behind, reset the reader
if p.rAt > off || p.rAt+MaxPieceReaderBurnBytes < off {
if p.r != nil {
if err := p.r.Close(); err != nil {
return 0, xerrors.Errorf("closing backing reader: %w", err)
}
p.r = nil
}
p.rAt = off
p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
if err != nil {
return 0, xerrors.Errorf("getting backing reader: %w", err)
}
}
// check if we need to burn some bytes
if off > p.rAt {
if _, err := io.CopyN(io.Discard, p.r, p.rAt-off); err != nil {
return 0, xerrors.Errorf("discarding read gap: %w", err)
}
}
// Read!
n, err = p.r.Read(b)
p.rAt += int64(n)
return n, err
}
var _ mount.Reader = (*pieceReader)(nil)

View File

@ -2,6 +2,9 @@ package dagstore
import ( import (
"context" "context"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"golang.org/x/xerrors"
"io"
"testing" "testing"
"github.com/filecoin-project/dagstore" "github.com/filecoin-project/dagstore"
@ -93,7 +96,7 @@ func TestShardRegistration(t *testing.T) {
cfg := config.DefaultStorageMiner().DAGStore cfg := config.DefaultStorageMiner().DAGStore
cfg.RootDir = t.TempDir() cfg.RootDir = t.TempDir()
mapi := NewMinerAPI(ps, sa, 10) mapi := NewMinerAPI(ps, &wrappedSA{sa}, 10)
dagst, w, err := NewDAGStore(cfg, mapi) dagst, w, err := NewDAGStore(cfg, mapi)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, dagst) require.NotNil(t, dagst)
@ -119,3 +122,22 @@ func TestShardRegistration(t *testing.T) {
// ps.VerifyExpectations(t) // ps.VerifyExpectations(t)
} }
type wrappedSA struct {
retrievalmarket.SectorAccessor
}
func (w *wrappedSA) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
r, err := w.UnsealSector(ctx, sectorID, pieceOffset, length)
if err != nil {
return nil, err
}
if startOffset > 0 {
if _, err := io.CopyN(io.Discard, r, int64(startOffset)); err != nil {
return nil, xerrors.Errorf("discard start off: %w", err)
}
}
return r, err
}
var _ SectorAccessor = &wrappedSA{}

View File

@ -3,6 +3,7 @@ package dagstore
import ( import (
"bytes" "bytes"
"context" "context"
"github.com/filecoin-project/go-state-types/abi"
"io" "io"
"os" "os"
"testing" "testing"
@ -191,7 +192,7 @@ func (m mockLotusMount) Start(ctx context.Context) error {
return nil return nil
} }
func (m mockLotusMount) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { func (m mockLotusMount) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) {
panic("implement me") panic("implement me")
} }

View File

@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/markets/dagstore"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/filecoin-project/lotus/storage/sectorblocks"
@ -34,12 +35,16 @@ type sectorAccessor struct {
var _ retrievalmarket.SectorAccessor = (*sectorAccessor)(nil) var _ retrievalmarket.SectorAccessor = (*sectorAccessor)(nil)
func NewSectorAccessor(maddr dtypes.MinerAddress, secb sectorblocks.SectorBuilder, pp sectorstorage.PieceProvider, full v1api.FullNode) retrievalmarket.SectorAccessor { func NewSectorAccessor(maddr dtypes.MinerAddress, secb sectorblocks.SectorBuilder, pp sectorstorage.PieceProvider, full v1api.FullNode) dagstore.SectorAccessor {
return &sectorAccessor{address.Address(maddr), secb, pp, full} return &sectorAccessor{address.Address(maddr), secb, pp, full}
} }
func (sa *sectorAccessor) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { func (sa *sectorAccessor) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
log.Debugf("get sector %d, offset %d, length %d", sectorID, offset, length) return sa.UnsealSectorAt(ctx, sectorID, pieceOffset, 0, length)
}
func (sa *sectorAccessor) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
log.Debugf("get sector %d, pieceOffset %d, length %d", sectorID, pieceOffset, length)
si, err := sa.sectorsStatus(ctx, sectorID, false) si, err := sa.sectorsStatus(ctx, sectorID, false)
if err != nil { if err != nil {
return nil, err return nil, err
@ -64,8 +69,8 @@ func (sa *sectorAccessor) UnsealSector(ctx context.Context, sectorID abi.SectorN
} }
// Get a reader for the piece, unsealing the piece if necessary // Get a reader for the piece, unsealing the piece if necessary
log.Debugf("read piece in sector %d, offset %d, length %d from miner %d", sectorID, offset, length, mid) log.Debugf("read piece in sector %d, pieceOffset %d, startOffset %d, length %d from miner %d", sectorID, pieceOffset, startOffset, length, mid)
r, unsealed, err := sa.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(offset), length, si.Ticket.Value, commD) r, unsealed, err := sa.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(pieceOffset), startOffset, length, si.Ticket.Value, commD)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to unseal piece from sector %d: %w", sectorID, err) return nil, xerrors.Errorf("failed to unseal piece from sector %d: %w", sectorID, err)
} }

View File

@ -155,7 +155,8 @@ func ConfigStorageMiner(c interface{}) Option {
Override(DAGStoreKey, modules.DAGStore), Override(DAGStoreKey, modules.DAGStore),
// Markets (retrieval) // Markets (retrieval)
Override(new(retrievalmarket.SectorAccessor), sectoraccessor.NewSectorAccessor), Override(new(dagstore.SectorAccessor), sectoraccessor.NewSectorAccessor),
Override(new(retrievalmarket.SectorAccessor), From(new(dagstore.SectorAccessor))),
Override(new(retrievalmarket.RetrievalProviderNode), retrievaladapter.NewRetrievalProviderNode), Override(new(retrievalmarket.RetrievalProviderNode), retrievaladapter.NewRetrievalProviderNode),
Override(new(rmnet.RetrievalMarketNetwork), modules.RetrievalNetwork), Override(new(rmnet.RetrievalMarketNetwork), modules.RetrievalNetwork),
Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider), Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider),

View File

@ -11,8 +11,6 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/dagstore" "github.com/filecoin-project/dagstore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
mdagstore "github.com/filecoin-project/lotus/markets/dagstore" mdagstore "github.com/filecoin-project/lotus/markets/dagstore"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
@ -25,7 +23,7 @@ const (
) )
// NewMinerAPI creates a new MinerAPI adaptor for the dagstore mounts. // NewMinerAPI creates a new MinerAPI adaptor for the dagstore mounts.
func NewMinerAPI(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor) (mdagstore.MinerAPI, error) { func NewMinerAPI(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, sa mdagstore.SectorAccessor) (mdagstore.MinerAPI, error) {
cfg, err := extractDAGStoreConfig(r) cfg, err := extractDAGStoreConfig(r)
if err != nil { if err != nil {
return nil, err return nil, err