From 8d955d5f3022ad1fbb95a7c1adb6add2cdb6ea26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 26 Nov 2021 17:40:43 +0100 Subject: [PATCH 1/9] dagstore mount: Add random access support --- extern/sector-storage/mock/mock.go | 6 +- extern/sector-storage/piece_provider.go | 43 ++++-- markets/dagstore/miner_api.go | 33 +++-- markets/dagstore/miner_api_test.go | 10 +- markets/dagstore/mocks/mock_lotus_accessor.go | 62 ++++---- markets/dagstore/mount.go | 29 +--- markets/dagstore/mount_test.go | 9 +- markets/dagstore/piecereader.go | 133 ++++++++++++++++++ markets/dagstore/wrapper_migration_test.go | 24 +++- markets/dagstore/wrapper_test.go | 3 +- markets/sectoraccessor/sectoraccessor.go | 15 +- node/builder_miner.go | 3 +- node/modules/storageminer_dagstore.go | 4 +- 13 files changed, 275 insertions(+), 99 deletions(-) create mode 100644 markets/dagstore/piecereader.go diff --git a/extern/sector-storage/mock/mock.go b/extern/sector-storage/mock/mock.go index 273f0928e..d3a18c15a 100644 --- a/extern/sector-storage/mock/mock.go +++ b/extern/sector-storage/mock/mock.go @@ -5,6 +5,7 @@ import ( "context" "crypto/sha256" "fmt" + sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "io" "io/ioutil" "math/rand" @@ -384,8 +385,8 @@ func generateFakePoSt(sectorInfo []proof5.SectorInfo, rpt func(abi.RegisteredSea } } -func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) { - if offset != 0 { +func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) { + if uint64(offset)+startOffset != 0 { panic("implme") } @@ -625,3 +626,4 @@ var MockProver = MockVerifier var _ storage.Sealer = &SectorMgr{} var _ ffiwrapper.Verifier = MockVerifier var _ ffiwrapper.Prover = MockProver +var _ sectorstorage.PieceProvider = &SectorMgr{} diff --git a/extern/sector-storage/piece_provider.go b/extern/sector-storage/piece_provider.go index ad3a2543e..bd99e42bc 100644 --- a/extern/sector-storage/piece_provider.go +++ b/extern/sector-storage/piece_provider.go @@ -23,7 +23,11 @@ type Unsealer interface { type PieceProvider interface { // ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector - ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) + // pieceOffset + pieceSize specify piece bounds for unsealing (note: with SDR the entire sector will be unsealed by + // default in most cases, but this might matter with future PoRep) + // startOffset is added to the pieceOffset to get the starting reader offset. + // The number of bytes that can be read is pieceSize-startOffset + ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) } @@ -67,7 +71,7 @@ func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storage.SectorRef // It will NOT try to schedule an Unseal of a sealed sector file for the read. // // Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers. -func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) { +func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, pieceOffset, startOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) { // acquire a lock purely for reading unsealed sectors ctx, cancel := context.WithCancel(ctx) if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil { @@ -78,7 +82,7 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage // Reader returns a reader for an unsealed piece at the given offset in the given sector. // The returned reader will be nil if none of the workers has an unsealed sector file containing // the unsealed piece. - r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded()) + r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()+startOffset.Padded()), size.Padded()) if err != nil { log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err) cancel() @@ -97,20 +101,22 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage // If we do NOT have an existing unsealed file containing the given piece thus causing us to schedule an Unseal, // the returned boolean parameter will be set to true. // If we have an existing unsealed file containing the given piece, the returned boolean will be set to false. -func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) { - if err := offset.Valid(); err != nil { - return nil, false, xerrors.Errorf("offset is not valid: %w", err) +func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) { + if err := pieceOffset.Valid(); err != nil { + return nil, false, xerrors.Errorf("pieceOffset is not valid: %w", err) } if err := size.Validate(); err != nil { return nil, false, xerrors.Errorf("size is not a valid piece size: %w", err) } - r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, offset, size) + startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127 + + r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size) log.Debugf("result of first tryReadUnsealedPiece: r=%+v, err=%s", r, err) if xerrors.Is(err, storiface.ErrSectorNotFound) { - log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size) + log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size) err = nil } if err != nil { @@ -129,14 +135,14 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, if unsealed == cid.Undef { commd = nil } - if err := p.uns.SectorsUnsealPiece(ctx, sector, offset, size, ticket, commd); err != nil { + if err := p.uns.SectorsUnsealPiece(ctx, sector, pieceOffset, size, ticket, commd); err != nil { log.Errorf("failed to SectorsUnsealPiece: %s", err) return nil, false, xerrors.Errorf("unsealing piece: %w", err) } - log.Debugf("unsealed a sector file to read the piece, sector=%+v, offset=%d, size=%d", sector, offset, size) + log.Debugf("unsealed a sector file to read the piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size) - r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, offset, size) + r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size) if err != nil { log.Errorf("failed to tryReadUnsealedPiece after SectorsUnsealPiece: %s", err) return nil, true, xerrors.Errorf("read after unsealing: %w", err) @@ -145,9 +151,9 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, log.Errorf("got no reader after unsealing piece") return nil, true, xerrors.Errorf("got no reader after unsealing piece") } - log.Debugf("got a reader to read unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size) + log.Debugf("got a reader to read unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size) } else { - log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, offset=%d, size=%d", sector, offset, size) + log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size) } upr, err := fr32.NewUnpadReader(r, size.Padded()) @@ -156,10 +162,17 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err) } - log.Debugf("returning reader to read unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size) + log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, startOffset=%d, size=%d", sector, pieceOffset, startOffset, size) + + bir := bufio.NewReaderSize(upr, 127) + if startOffset > uint64(startOffsetAligned) { + if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil { + return nil, false, xerrors.Errorf("discarding bytes for startOffset: %w", err) + } + } return &funcCloser{ - Reader: bufio.NewReaderSize(upr, 127), + Reader: bir, close: func() error { err = r.Close() unlock() diff --git a/markets/dagstore/miner_api.go b/markets/dagstore/miner_api.go index afe623eb2..d59a05846 100644 --- a/markets/dagstore/miner_api.go +++ b/markets/dagstore/miner_api.go @@ -6,6 +6,7 @@ import ( "io" "github.com/filecoin-project/dagstore/throttle" + "github.com/filecoin-project/go-state-types/abi" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -14,23 +15,31 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" ) +//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_lotus_accessor.go -package=mock_dagstore . MinerAPI + type MinerAPI interface { - FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) + FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) Start(ctx context.Context) error } +type SectorAccessor interface { + retrievalmarket.SectorAccessor + + UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) +} + type minerAPI struct { pieceStore piecestore.PieceStore - sa retrievalmarket.SectorAccessor + sa SectorAccessor throttle throttle.Throttler readyMgr *shared.ReadyManager } var _ MinerAPI = (*minerAPI)(nil) -func NewMinerAPI(store piecestore.PieceStore, sa retrievalmarket.SectorAccessor, concurrency int) MinerAPI { +func NewMinerAPI(store piecestore.PieceStore, sa SectorAccessor, concurrency int) MinerAPI { return &minerAPI{ pieceStore: store, sa: sa, @@ -91,10 +100,10 @@ func (m *minerAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, erro return false, nil } -func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { +func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) { err := m.readyMgr.AwaitReady() if err != nil { - return nil, err + return nil, 0, err } // Throttle this path to avoid flooding the storage subsystem. @@ -105,11 +114,11 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io }) if err != nil { - return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err) + return nil, 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err) } if len(pieceInfo.Deals) == 0 { - return nil, xerrors.Errorf("no storage deals found for piece %s", pieceCid) + return nil, 0, xerrors.Errorf("no storage deals found for piece %s", pieceCid) } // prefer an unsealed sector containing the piece if one exists @@ -127,7 +136,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io return nil } // Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing. - reader, err = m.sa.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) + reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded()) return err }) @@ -138,7 +147,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io if reader != nil { // we were able to obtain a reader for an already unsealed piece - return reader, nil + return reader, deal.Length.Unpadded(), nil } } @@ -149,7 +158,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io // block for a long time with the current PoRep // // This path is unthrottled. - reader, err := m.sa.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) + reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded()) if err != nil { lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err) log.Warn(lastErr.Error()) @@ -157,10 +166,10 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io } // Successfully fetched the deal data so return a reader over the data - return reader, nil + return reader, deal.Length.Unpadded(), nil } - return nil, lastErr + return nil, 0, lastErr } func (m *minerAPI) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) { diff --git a/markets/dagstore/miner_api_test.go b/markets/dagstore/miner_api_test.go index 4a61c62a8..38c3a4fc3 100644 --- a/markets/dagstore/miner_api_test.go +++ b/markets/dagstore/miner_api_test.go @@ -87,7 +87,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) { } // Fetch the piece - r, err := api.FetchUnsealedPiece(ctx, cid1) + r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0) if tc.expectErr { require.Error(t, err) return @@ -159,7 +159,7 @@ func TestThrottle(t *testing.T) { errgrp, ctx := errgroup.WithContext(context.Background()) for i := 0; i < 10; i++ { errgrp.Go(func() error { - r, err := api.FetchUnsealedPiece(ctx, cid1) + r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0) if err == nil { _ = r.Close() } @@ -203,6 +203,10 @@ type mockRPN struct { } func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { + return m.UnsealSectorAt(ctx, sectorID, offset, 0, length) +} + +func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { atomic.AddInt32(&m.calls, 1) m.lk.RLock() defer m.lk.RUnlock() @@ -211,7 +215,7 @@ func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, o if !ok { panic("sector not found") } - return io.NopCloser(bytes.NewBuffer([]byte(data))), nil + return io.NopCloser(bytes.NewBuffer([]byte(data[startOffset:]))), nil } func (m *mockRPN) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) { diff --git a/markets/dagstore/mocks/mock_lotus_accessor.go b/markets/dagstore/mocks/mock_lotus_accessor.go index 2e19b4482..e10a1b053 100644 --- a/markets/dagstore/mocks/mock_lotus_accessor.go +++ b/markets/dagstore/mocks/mock_lotus_accessor.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: lotusaccessor.go +// Source: github.com/filecoin-project/lotus/markets/dagstore (interfaces: MinerAPI) // Package mock_dagstore is a generated GoMock package. package mock_dagstore @@ -9,88 +9,90 @@ import ( io "io" reflect "reflect" + abi "github.com/filecoin-project/go-state-types/abi" gomock "github.com/golang/mock/gomock" cid "github.com/ipfs/go-cid" ) -// MockLotusAccessor is a mock of LotusAccessor interface. -type MockLotusAccessor struct { +// MockMinerAPI is a mock of MinerAPI interface. +type MockMinerAPI struct { ctrl *gomock.Controller - recorder *MockLotusAccessorMockRecorder + recorder *MockMinerAPIMockRecorder } -// MockLotusAccessorMockRecorder is the mock recorder for MockLotusAccessor. -type MockLotusAccessorMockRecorder struct { - mock *MockLotusAccessor +// MockMinerAPIMockRecorder is the mock recorder for MockMinerAPI. +type MockMinerAPIMockRecorder struct { + mock *MockMinerAPI } -// NewMockLotusAccessor creates a new mock instance. -func NewMockLotusAccessor(ctrl *gomock.Controller) *MockLotusAccessor { - mock := &MockLotusAccessor{ctrl: ctrl} - mock.recorder = &MockLotusAccessorMockRecorder{mock} +// NewMockMinerAPI creates a new mock instance. +func NewMockMinerAPI(ctrl *gomock.Controller) *MockMinerAPI { + mock := &MockMinerAPI{ctrl: ctrl} + mock.recorder = &MockMinerAPIMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockLotusAccessor) EXPECT() *MockLotusAccessorMockRecorder { +func (m *MockMinerAPI) EXPECT() *MockMinerAPIMockRecorder { return m.recorder } // FetchUnsealedPiece mocks base method. -func (m *MockLotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { +func (m *MockMinerAPI) FetchUnsealedPiece(arg0 context.Context, arg1 cid.Cid, arg2 uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchUnsealedPiece", ctx, pieceCid) + ret := m.ctrl.Call(m, "FetchUnsealedPiece", arg0, arg1, arg2) ret0, _ := ret[0].(io.ReadCloser) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret1, _ := ret[1].(abi.UnpaddedPieceSize) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } // FetchUnsealedPiece indicates an expected call of FetchUnsealedPiece. -func (mr *MockLotusAccessorMockRecorder) FetchUnsealedPiece(ctx, pieceCid interface{}) *gomock.Call { +func (mr *MockMinerAPIMockRecorder) FetchUnsealedPiece(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockLotusAccessor)(nil).FetchUnsealedPiece), ctx, pieceCid) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockMinerAPI)(nil).FetchUnsealedPiece), arg0, arg1, arg2) } // GetUnpaddedCARSize mocks base method. -func (m *MockLotusAccessor) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) { +func (m *MockMinerAPI) GetUnpaddedCARSize(arg0 context.Context, arg1 cid.Cid) (uint64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetUnpaddedCARSize", ctx, pieceCid) + ret := m.ctrl.Call(m, "GetUnpaddedCARSize", arg0, arg1) ret0, _ := ret[0].(uint64) ret1, _ := ret[1].(error) return ret0, ret1 } // GetUnpaddedCARSize indicates an expected call of GetUnpaddedCARSize. -func (mr *MockLotusAccessorMockRecorder) GetUnpaddedCARSize(ctx, pieceCid interface{}) *gomock.Call { +func (mr *MockMinerAPIMockRecorder) GetUnpaddedCARSize(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUnpaddedCARSize", reflect.TypeOf((*MockLotusAccessor)(nil).GetUnpaddedCARSize), ctx, pieceCid) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUnpaddedCARSize", reflect.TypeOf((*MockMinerAPI)(nil).GetUnpaddedCARSize), arg0, arg1) } // IsUnsealed mocks base method. -func (m *MockLotusAccessor) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) { +func (m *MockMinerAPI) IsUnsealed(arg0 context.Context, arg1 cid.Cid) (bool, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsUnsealed", ctx, pieceCid) + ret := m.ctrl.Call(m, "IsUnsealed", arg0, arg1) ret0, _ := ret[0].(bool) ret1, _ := ret[1].(error) return ret0, ret1 } // IsUnsealed indicates an expected call of IsUnsealed. -func (mr *MockLotusAccessorMockRecorder) IsUnsealed(ctx, pieceCid interface{}) *gomock.Call { +func (mr *MockMinerAPIMockRecorder) IsUnsealed(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnsealed", reflect.TypeOf((*MockLotusAccessor)(nil).IsUnsealed), ctx, pieceCid) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnsealed", reflect.TypeOf((*MockMinerAPI)(nil).IsUnsealed), arg0, arg1) } // Start mocks base method. -func (m *MockLotusAccessor) Start(ctx context.Context) error { +func (m *MockMinerAPI) Start(arg0 context.Context) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Start", ctx) + ret := m.ctrl.Call(m, "Start", arg0) ret0, _ := ret[0].(error) return ret0 } // Start indicates an expected call of Start. -func (mr *MockLotusAccessorMockRecorder) Start(ctx interface{}) *gomock.Call { +func (mr *MockMinerAPIMockRecorder) Start(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockLotusAccessor)(nil).Start), ctx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockMinerAPI)(nil).Start), arg0) } diff --git a/markets/dagstore/mount.go b/markets/dagstore/mount.go index c97dcbf86..e4e6242d7 100644 --- a/markets/dagstore/mount.go +++ b/markets/dagstore/mount.go @@ -2,7 +2,6 @@ package dagstore import ( "context" - "io" "net/url" "github.com/ipfs/go-cid" @@ -57,19 +56,19 @@ func (l *LotusMount) Deserialize(u *url.URL) error { } func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) { - r, err := l.API.FetchUnsealedPiece(ctx, l.PieceCid) - if err != nil { - return nil, xerrors.Errorf("failed to fetch unsealed piece %s: %w", l.PieceCid, err) - } - return &readCloser{r}, nil + return (&pieceReader{ + ctx: ctx, + api: l.API, + pieceCid: l.PieceCid, + }).init() } func (l *LotusMount) Info() mount.Info { return mount.Info{ Kind: mount.KindRemote, AccessSequential: true, - AccessSeek: false, - AccessRandom: false, + AccessSeek: true, + AccessRandom: true, } } @@ -94,17 +93,3 @@ func (l *LotusMount) Stat(ctx context.Context) (mount.Stat, error) { Ready: isUnsealed, }, nil } - -type readCloser struct { - io.ReadCloser -} - -var _ mount.Reader = (*readCloser)(nil) - -func (r *readCloser) ReadAt(p []byte, off int64) (n int, err error) { - return 0, xerrors.Errorf("ReadAt called but not implemented") -} - -func (r *readCloser) Seek(offset int64, whence int) (int64, error) { - return 0, xerrors.Errorf("Seek called but not implemented") -} diff --git a/markets/dagstore/mount_test.go b/markets/dagstore/mount_test.go index 09b255d6a..d4b4fbbff 100644 --- a/markets/dagstore/mount_test.go +++ b/markets/dagstore/mount_test.go @@ -2,6 +2,7 @@ package dagstore import ( "context" + "github.com/filecoin-project/go-state-types/abi" "io/ioutil" "net/url" "strings" @@ -26,12 +27,12 @@ func TestLotusMount(t *testing.T) { defer mockCtrl.Finish() // create a mock lotus api that returns the reader we want - mockLotusMountAPI := mock_dagstore.NewMockLotusAccessor(mockCtrl) + mockLotusMountAPI := mock_dagstore.NewMockMinerAPI(mockCtrl) mockLotusMountAPI.EXPECT().IsUnsealed(gomock.Any(), cid).Return(true, nil).Times(1) - mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1) - mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1) + mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1) + mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1) mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1) mnt, err := NewLotusMount(cid, mockLotusMountAPI) @@ -109,7 +110,7 @@ func TestLotusMountRegistration(t *testing.T) { // when test is done, assert expectations on all mock objects. defer mockCtrl.Finish() - mockLotusMountAPI := mock_dagstore.NewMockLotusAccessor(mockCtrl) + mockLotusMountAPI := mock_dagstore.NewMockMinerAPI(mockCtrl) registry := mount.NewRegistry() err = registry.Register(lotusScheme, mountTemplate(mockLotusMountAPI)) require.NoError(t, err) diff --git a/markets/dagstore/piecereader.go b/markets/dagstore/piecereader.go new file mode 100644 index 000000000..b76c24cea --- /dev/null +++ b/markets/dagstore/piecereader.go @@ -0,0 +1,133 @@ +package dagstore + +import ( + "context" + "io" + + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/dagstore/mount" + "github.com/filecoin-project/go-state-types/abi" +) + +// for small read skips, it's faster to "burn" some bytes than to setup new +// sector reader +var MaxPieceReaderBurnBytes int64 = 512 << 10 // 512k + +type pieceReader struct { + ctx context.Context + api MinerAPI + pieceCid cid.Cid + len abi.UnpaddedPieceSize + + closed bool + seqAt int64 // next byte to be read by io.Reader + + r io.ReadCloser + rAt int64 +} + +func (p *pieceReader) init() (_ *pieceReader, err error) { + p.rAt = 0 + p.r, p.len, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt)) + if err != nil { + return nil, err + } + + return p, nil +} + +func (p *pieceReader) check() error { + if p.closed { + return xerrors.Errorf("reader closed") + } + + return nil +} + +func (p *pieceReader) Close() error { + if err := p.check(); err != nil { + return err + } + + if p.r != nil { + if err := p.r.Close(); err != nil { + return err + } + p.r = nil + } + + return nil +} + +func (p *pieceReader) Read(b []byte) (int, error) { + if err := p.check(); err != nil { + return 0, err + } + + n, err := p.ReadAt(b, p.seqAt) + p.seqAt += int64(n) + return n, err +} + +func (p *pieceReader) Seek(offset int64, whence int) (int64, error) { + if err := p.check(); err != nil { + return 0, err + } + + switch whence { + case io.SeekStart: + p.seqAt = offset + case io.SeekCurrent: + p.seqAt += offset + case io.SeekEnd: + p.seqAt = int64(p.len) + offset + default: + return 0, xerrors.Errorf("bad whence") + } + + return p.seqAt, nil +} + +func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { + if err := p.check(); err != nil { + return 0, err + } + + // get the backing reader into the correct position + if p.r == nil { + p.rAt = MaxPieceReaderBurnBytes * -2 + } + + // if the backing reader is ahead of the offset we want, or more than + // MaxPieceReaderBurnBytes behind, reset the reader + if p.rAt > off || p.rAt+MaxPieceReaderBurnBytes < off { + if p.r != nil { + if err := p.r.Close(); err != nil { + return 0, xerrors.Errorf("closing backing reader: %w", err) + } + p.r = nil + } + + p.rAt = off + p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt)) + if err != nil { + return 0, xerrors.Errorf("getting backing reader: %w", err) + } + } + + // check if we need to burn some bytes + if off > p.rAt { + if _, err := io.CopyN(io.Discard, p.r, p.rAt-off); err != nil { + return 0, xerrors.Errorf("discarding read gap: %w", err) + } + } + + // Read! + n, err = p.r.Read(b) + p.rAt += int64(n) + return n, err +} + +var _ mount.Reader = (*pieceReader)(nil) diff --git a/markets/dagstore/wrapper_migration_test.go b/markets/dagstore/wrapper_migration_test.go index 13d8db876..f38acec4d 100644 --- a/markets/dagstore/wrapper_migration_test.go +++ b/markets/dagstore/wrapper_migration_test.go @@ -2,6 +2,9 @@ package dagstore import ( "context" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "golang.org/x/xerrors" + "io" "testing" "github.com/filecoin-project/dagstore" @@ -93,7 +96,7 @@ func TestShardRegistration(t *testing.T) { cfg := config.DefaultStorageMiner().DAGStore cfg.RootDir = t.TempDir() - mapi := NewMinerAPI(ps, sa, 10) + mapi := NewMinerAPI(ps, &wrappedSA{sa}, 10) dagst, w, err := NewDAGStore(cfg, mapi) require.NoError(t, err) require.NotNil(t, dagst) @@ -119,3 +122,22 @@ func TestShardRegistration(t *testing.T) { // ps.VerifyExpectations(t) } + +type wrappedSA struct { + retrievalmarket.SectorAccessor +} + +func (w *wrappedSA) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { + r, err := w.UnsealSector(ctx, sectorID, pieceOffset, length) + if err != nil { + return nil, err + } + if startOffset > 0 { + if _, err := io.CopyN(io.Discard, r, int64(startOffset)); err != nil { + return nil, xerrors.Errorf("discard start off: %w", err) + } + } + return r, err +} + +var _ SectorAccessor = &wrappedSA{} diff --git a/markets/dagstore/wrapper_test.go b/markets/dagstore/wrapper_test.go index 9d3e6939e..23cd84cac 100644 --- a/markets/dagstore/wrapper_test.go +++ b/markets/dagstore/wrapper_test.go @@ -3,6 +3,7 @@ package dagstore import ( "bytes" "context" + "github.com/filecoin-project/go-state-types/abi" "io" "os" "testing" @@ -191,7 +192,7 @@ func (m mockLotusMount) Start(ctx context.Context) error { return nil } -func (m mockLotusMount) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { +func (m mockLotusMount) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) { panic("implement me") } diff --git a/markets/sectoraccessor/sectoraccessor.go b/markets/sectoraccessor/sectoraccessor.go index 1304a3a00..f70aca103 100644 --- a/markets/sectoraccessor/sectoraccessor.go +++ b/markets/sectoraccessor/sectoraccessor.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" + "github.com/filecoin-project/lotus/markets/dagstore" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/storage/sectorblocks" @@ -34,12 +35,16 @@ type sectorAccessor struct { var _ retrievalmarket.SectorAccessor = (*sectorAccessor)(nil) -func NewSectorAccessor(maddr dtypes.MinerAddress, secb sectorblocks.SectorBuilder, pp sectorstorage.PieceProvider, full v1api.FullNode) retrievalmarket.SectorAccessor { +func NewSectorAccessor(maddr dtypes.MinerAddress, secb sectorblocks.SectorBuilder, pp sectorstorage.PieceProvider, full v1api.FullNode) dagstore.SectorAccessor { return §orAccessor{address.Address(maddr), secb, pp, full} } -func (sa *sectorAccessor) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { - log.Debugf("get sector %d, offset %d, length %d", sectorID, offset, length) +func (sa *sectorAccessor) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { + return sa.UnsealSectorAt(ctx, sectorID, pieceOffset, 0, length) +} + +func (sa *sectorAccessor) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { + log.Debugf("get sector %d, pieceOffset %d, length %d", sectorID, pieceOffset, length) si, err := sa.sectorsStatus(ctx, sectorID, false) if err != nil { return nil, err @@ -64,8 +69,8 @@ func (sa *sectorAccessor) UnsealSector(ctx context.Context, sectorID abi.SectorN } // Get a reader for the piece, unsealing the piece if necessary - log.Debugf("read piece in sector %d, offset %d, length %d from miner %d", sectorID, offset, length, mid) - r, unsealed, err := sa.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(offset), length, si.Ticket.Value, commD) + log.Debugf("read piece in sector %d, pieceOffset %d, startOffset %d, length %d from miner %d", sectorID, pieceOffset, startOffset, length, mid) + r, unsealed, err := sa.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(pieceOffset), startOffset, length, si.Ticket.Value, commD) if err != nil { return nil, xerrors.Errorf("failed to unseal piece from sector %d: %w", sectorID, err) } diff --git a/node/builder_miner.go b/node/builder_miner.go index 3447eb3e6..74b0c5558 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -155,7 +155,8 @@ func ConfigStorageMiner(c interface{}) Option { Override(DAGStoreKey, modules.DAGStore), // Markets (retrieval) - Override(new(retrievalmarket.SectorAccessor), sectoraccessor.NewSectorAccessor), + Override(new(dagstore.SectorAccessor), sectoraccessor.NewSectorAccessor), + Override(new(retrievalmarket.SectorAccessor), From(new(dagstore.SectorAccessor))), Override(new(retrievalmarket.RetrievalProviderNode), retrievaladapter.NewRetrievalProviderNode), Override(new(rmnet.RetrievalMarketNetwork), modules.RetrievalNetwork), Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider), diff --git a/node/modules/storageminer_dagstore.go b/node/modules/storageminer_dagstore.go index 1f72a49b9..b4f5d3535 100644 --- a/node/modules/storageminer_dagstore.go +++ b/node/modules/storageminer_dagstore.go @@ -11,8 +11,6 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/dagstore" - "github.com/filecoin-project/go-fil-markets/retrievalmarket" - mdagstore "github.com/filecoin-project/lotus/markets/dagstore" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" @@ -25,7 +23,7 @@ const ( ) // NewMinerAPI creates a new MinerAPI adaptor for the dagstore mounts. -func NewMinerAPI(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, sa retrievalmarket.SectorAccessor) (mdagstore.MinerAPI, error) { +func NewMinerAPI(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, sa mdagstore.SectorAccessor) (mdagstore.MinerAPI, error) { cfg, err := extractDAGStoreConfig(r) if err != nil { return nil, err From a9ee2636825522223448c41cb9a6acf3e76e77ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 26 Nov 2021 18:01:09 +0100 Subject: [PATCH 2/9] Fix dagstore pieceReader burn logic --- markets/dagstore/piecereader.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/markets/dagstore/piecereader.go b/markets/dagstore/piecereader.go index b76c24cea..9b09c50cf 100644 --- a/markets/dagstore/piecereader.go +++ b/markets/dagstore/piecereader.go @@ -119,11 +119,18 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { // check if we need to burn some bytes if off > p.rAt { - if _, err := io.CopyN(io.Discard, p.r, p.rAt-off); err != nil { + n, err := io.CopyN(io.Discard, p.r, off-p.rAt) + p.rAt += n + if err != nil { return 0, xerrors.Errorf("discarding read gap: %w", err) } } + // sanity check + if off != p.rAt { + return 0, xerrors.Errorf("bad reader offset; requested %d; at %d", off, p.rAt) + } + // Read! n, err = p.r.Read(b) p.rAt += int64(n) From f6de16e95afc31b06c7e71e184e781f79cbd9649 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 26 Nov 2021 18:16:53 +0100 Subject: [PATCH 3/9] Fix sector-storage tests --- extern/sector-storage/mock/mock.go | 2 -- extern/sector-storage/piece_provider_test.go | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/extern/sector-storage/mock/mock.go b/extern/sector-storage/mock/mock.go index d3a18c15a..3acf3dc41 100644 --- a/extern/sector-storage/mock/mock.go +++ b/extern/sector-storage/mock/mock.go @@ -5,7 +5,6 @@ import ( "context" "crypto/sha256" "fmt" - sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "io" "io/ioutil" "math/rand" @@ -626,4 +625,3 @@ var MockProver = MockVerifier var _ storage.Sealer = &SectorMgr{} var _ ffiwrapper.Verifier = MockVerifier var _ ffiwrapper.Prover = MockProver -var _ sectorstorage.PieceProvider = &SectorMgr{} diff --git a/extern/sector-storage/piece_provider_test.go b/extern/sector-storage/piece_provider_test.go index eb3ffa7c3..0abba1bd8 100644 --- a/extern/sector-storage/piece_provider_test.go +++ b/extern/sector-storage/piece_provider_test.go @@ -337,7 +337,7 @@ func (p *pieceProviderTestHarness) isUnsealed(t *testing.T, offset storiface.Unp func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, expectedHadToUnseal bool, expectedBytes []byte) { - rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD) + rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, 0, size, p.ticket, p.commD) require.NoError(t, err) require.NotNil(t, rd) require.Equal(t, expectedHadToUnseal, isUnsealed) From abec7dd3bc8601bd40538c8d2efb02bf1d1bcf54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 26 Nov 2021 18:40:27 +0100 Subject: [PATCH 4/9] Fix imports --- markets/dagstore/mount_test.go | 2 +- markets/dagstore/wrapper_migration_test.go | 6 +++--- markets/dagstore/wrapper_test.go | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/markets/dagstore/mount_test.go b/markets/dagstore/mount_test.go index d4b4fbbff..ec5e8a086 100644 --- a/markets/dagstore/mount_test.go +++ b/markets/dagstore/mount_test.go @@ -2,7 +2,6 @@ package dagstore import ( "context" - "github.com/filecoin-project/go-state-types/abi" "io/ioutil" "net/url" "strings" @@ -13,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/filecoin-project/dagstore/mount" + "github.com/filecoin-project/go-state-types/abi" mock_dagstore "github.com/filecoin-project/lotus/markets/dagstore/mocks" ) diff --git a/markets/dagstore/wrapper_migration_test.go b/markets/dagstore/wrapper_migration_test.go index f38acec4d..502105499 100644 --- a/markets/dagstore/wrapper_migration_test.go +++ b/markets/dagstore/wrapper_migration_test.go @@ -2,16 +2,16 @@ package dagstore import ( "context" - "github.com/filecoin-project/go-fil-markets/retrievalmarket" - "golang.org/x/xerrors" "io" "testing" - "github.com/filecoin-project/dagstore" "github.com/stretchr/testify/require" + "golang.org/x/xerrors" + "github.com/filecoin-project/dagstore" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/testnodes" tut "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-fil-markets/storagemarket" diff --git a/markets/dagstore/wrapper_test.go b/markets/dagstore/wrapper_test.go index 23cd84cac..a4a6215e1 100644 --- a/markets/dagstore/wrapper_test.go +++ b/markets/dagstore/wrapper_test.go @@ -3,22 +3,22 @@ package dagstore import ( "bytes" "context" - "github.com/filecoin-project/go-state-types/abi" "io" "os" "testing" "time" + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/dagstore" "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/dagstore/shard" - "github.com/ipfs/go-cid" - "github.com/stretchr/testify/require" + "github.com/filecoin-project/lotus/node/config" ) // TestWrapperAcquireRecovery verifies that if acquire shard returns a "not found" From 743ce5a40f15365f645ab699ca9ac79a843109e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 26 Nov 2021 18:48:52 +0100 Subject: [PATCH 5/9] Add startOffset support to mock SectorMgr.ReadPiece --- extern/sector-storage/mock/mock.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extern/sector-storage/mock/mock.go b/extern/sector-storage/mock/mock.go index 3acf3dc41..eeb1404ad 100644 --- a/extern/sector-storage/mock/mock.go +++ b/extern/sector-storage/mock/mock.go @@ -385,11 +385,11 @@ func generateFakePoSt(sectorInfo []proof5.SectorInfo, rpt func(abi.RegisteredSea } func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) { - if uint64(offset)+startOffset != 0 { + if uint64(offset) != 0 { panic("implme") } - return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])), false, nil + return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][startOffset:size])), false, nil } func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) { From 331702cd951e83e144cc2dd37bbb1c5e8c0dbcad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 26 Nov 2021 18:49:41 +0100 Subject: [PATCH 6/9] Tweak MaxPieceReaderBurnBytes --- markets/dagstore/piecereader.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/markets/dagstore/piecereader.go b/markets/dagstore/piecereader.go index 9b09c50cf..2f6f6fef0 100644 --- a/markets/dagstore/piecereader.go +++ b/markets/dagstore/piecereader.go @@ -11,9 +11,9 @@ import ( "github.com/filecoin-project/go-state-types/abi" ) -// for small read skips, it's faster to "burn" some bytes than to setup new -// sector reader -var MaxPieceReaderBurnBytes int64 = 512 << 10 // 512k +// For small read skips, it's faster to "burn" some bytes than to setup new sector reader. +// Assuming 1ms stream seek latency, and 1G/s stream rate, we're willing to discard up to 1 MiB. +var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M type pieceReader struct { ctx context.Context From 9110e6f63273e1fe071ce64263d7e09271923b92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 26 Nov 2021 20:21:09 +0100 Subject: [PATCH 7/9] dagstore pieceReader: add debug log on stream restart --- markets/dagstore/piecereader.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/markets/dagstore/piecereader.go b/markets/dagstore/piecereader.go index 2f6f6fef0..82d1f50e4 100644 --- a/markets/dagstore/piecereader.go +++ b/markets/dagstore/piecereader.go @@ -110,6 +110,8 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { p.r = nil } + log.Debugw("pieceReader new stream", "at", p.rAt, "off", off-p.rAt) + p.rAt = off p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt)) if err != nil { From 05aa8604591fd8dcef169ee0a783b7b0ea9e509f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 27 Nov 2021 00:05:45 +0100 Subject: [PATCH 8/9] Request correct read size with startOffset in pieceProvider --- extern/sector-storage/piece_provider.go | 2 +- markets/dagstore/piecereader.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/extern/sector-storage/piece_provider.go b/extern/sector-storage/piece_provider.go index bd99e42bc..aa2ef0d0d 100644 --- a/extern/sector-storage/piece_provider.go +++ b/extern/sector-storage/piece_provider.go @@ -82,7 +82,7 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage // Reader returns a reader for an unsealed piece at the given offset in the given sector. // The returned reader will be nil if none of the workers has an unsealed sector file containing // the unsealed piece. - r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()+startOffset.Padded()), size.Padded()) + r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()+startOffset.Padded()), size.Padded()-abi.PaddedPieceSize(startOffset.Padded())) if err != nil { log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err) cancel() diff --git a/markets/dagstore/piecereader.go b/markets/dagstore/piecereader.go index 82d1f50e4..37c15d60f 100644 --- a/markets/dagstore/piecereader.go +++ b/markets/dagstore/piecereader.go @@ -110,7 +110,7 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { p.r = nil } - log.Debugw("pieceReader new stream", "at", p.rAt, "off", off-p.rAt) + log.Debugw("pieceReader new stream", "piece", p.pieceCid, "at", p.rAt, "off", off-p.rAt) p.rAt = off p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt)) From 4bcde2f0ff199445e4df6ae6ea32ad5dc1984886 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 29 Nov 2021 15:32:27 +0100 Subject: [PATCH 9/9] dagstore pieceReader: Cleanup reader nil check --- markets/dagstore/piecereader.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/markets/dagstore/piecereader.go b/markets/dagstore/piecereader.go index 37c15d60f..6ef69dfbe 100644 --- a/markets/dagstore/piecereader.go +++ b/markets/dagstore/piecereader.go @@ -95,14 +95,11 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { return 0, err } - // get the backing reader into the correct position - if p.r == nil { - p.rAt = MaxPieceReaderBurnBytes * -2 - } + // 1. Get the backing reader into the correct position // if the backing reader is ahead of the offset we want, or more than // MaxPieceReaderBurnBytes behind, reset the reader - if p.rAt > off || p.rAt+MaxPieceReaderBurnBytes < off { + if p.r == nil || p.rAt > off || p.rAt+MaxPieceReaderBurnBytes < off { if p.r != nil { if err := p.r.Close(); err != nil { return 0, xerrors.Errorf("closing backing reader: %w", err) @@ -119,7 +116,7 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { } } - // check if we need to burn some bytes + // 2. Check if we need to burn some bytes if off > p.rAt { n, err := io.CopyN(io.Discard, p.r, off-p.rAt) p.rAt += n @@ -128,12 +125,12 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { } } - // sanity check + // 3. Sanity check if off != p.rAt { return 0, xerrors.Errorf("bad reader offset; requested %d; at %d", off, p.rAt) } - // Read! + // 4. Read! n, err = p.r.Read(b) p.rAt += int64(n) return n, err