unit tests for the remote store Reader
This commit is contained in:
parent
9b34494501
commit
bd9959070e
@ -361,7 +361,8 @@ var runCmd = &cli.Command{
|
||||
return xerrors.Errorf("could not get api info: %w", err)
|
||||
}
|
||||
|
||||
remote := stores.NewRemote(localStore, nodeApi, sminfo.AuthHeader(), cctx.Int("parallel-fetch-limit"))
|
||||
remote := stores.NewRemote(localStore, nodeApi, sminfo.AuthHeader(), cctx.Int("parallel-fetch-limit"),
|
||||
&stores.DefaultPartialFileHandler{})
|
||||
|
||||
fh := &stores.FetchHandler{Local: localStore, PfHandler: &stores.DefaultPartialFileHandler{}}
|
||||
remoteHandler := func(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -460,7 +460,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stor := stores.NewRemote(lstor, si, http.Header(sa), 10)
|
||||
stor := stores.NewRemote(lstor, si, http.Header(sa), 10, &stores.DefaultPartialFileHandler{})
|
||||
|
||||
smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.SealerConfig{
|
||||
ParallelFetchLimit: 10,
|
||||
|
2
extern/sector-storage/manager_test.go
vendored
2
extern/sector-storage/manager_test.go
vendored
@ -98,7 +98,7 @@ func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Man
|
||||
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
|
||||
require.NoError(t, err)
|
||||
|
||||
stor := stores.NewRemote(lstor, si, nil, 6000)
|
||||
stor := stores.NewRemote(lstor, si, nil, 6000, &stores.DefaultPartialFileHandler{})
|
||||
|
||||
m := &Manager{
|
||||
ls: st,
|
||||
|
2
extern/sector-storage/piece_provider_test.go
vendored
2
extern/sector-storage/piece_provider_test.go
vendored
@ -31,7 +31,7 @@ func TestPieceProviderReadPiece(t *testing.T) {
|
||||
index := stores.NewIndex()
|
||||
localStore, err := stores.NewLocal(ctx, storage, index, nil)
|
||||
require.NoError(t, err)
|
||||
remoteStore := stores.NewRemote(localStore, index, nil, 6000)
|
||||
remoteStore := stores.NewRemote(localStore, index, nil, 6000, &stores.DefaultPartialFileHandler{})
|
||||
dstore := ds_sync.MutexWrap(datastore.NewMapDatastore())
|
||||
wsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/worker/calls")))
|
||||
smsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls")))
|
||||
|
9
extern/sector-storage/stores/http_handler.go
vendored
9
extern/sector-storage/stores/http_handler.go
vendored
@ -35,6 +35,15 @@ func (d *DefaultPartialFileHandler) HasAllocated(pf *partialfile.PartialFile, of
|
||||
return pf.HasAllocated(offset, size)
|
||||
}
|
||||
|
||||
func (d *DefaultPartialFileHandler) Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
|
||||
return pf.Reader(offset, size)
|
||||
}
|
||||
|
||||
// Close closes the partial file
|
||||
func (d *DefaultPartialFileHandler) Close(pf *partialfile.PartialFile) error {
|
||||
return pf.Close()
|
||||
}
|
||||
|
||||
type FetchHandler struct {
|
||||
Local Store
|
||||
PfHandler partialFileHandler
|
||||
|
@ -118,10 +118,9 @@ func TestRemoteGetAllocated(t *testing.T) {
|
||||
storiface.SectorPaths{}, nil).Times(1)
|
||||
},
|
||||
},
|
||||
"fails when partial file is not found locally": {
|
||||
"fails when error while opening partial file": {
|
||||
expectedStatusCode: http.StatusInternalServerError,
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
// will return emppty paths
|
||||
|
||||
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
|
||||
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
|
||||
@ -131,7 +130,6 @@ func TestRemoteGetAllocated(t *testing.T) {
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockpartialFileHandler) {
|
||||
//OpenPartialFile(maxPieceSize abi.PaddedPieceSize, path string)
|
||||
pf.EXPECT().OpenPartialFile(abi.PaddedPieceSize(sectorSize), pfPath).Return(&partialfile.PartialFile{},
|
||||
xerrors.New("some error")).Times(1)
|
||||
},
|
||||
@ -140,7 +138,6 @@ func TestRemoteGetAllocated(t *testing.T) {
|
||||
"fails when determining partial file allocation returns an error": {
|
||||
expectedStatusCode: http.StatusInternalServerError,
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
// will return emppty paths
|
||||
|
||||
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
|
||||
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
|
||||
@ -160,7 +157,6 @@ func TestRemoteGetAllocated(t *testing.T) {
|
||||
"StatusRequestedRangeNotSatisfiable when piece is NOT allocated in partial file": {
|
||||
expectedStatusCode: http.StatusRequestedRangeNotSatisfiable,
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
// will return emppty paths
|
||||
|
||||
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
|
||||
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
|
||||
@ -180,7 +176,6 @@ func TestRemoteGetAllocated(t *testing.T) {
|
||||
"OK when piece is allocated in partial file": {
|
||||
expectedStatusCode: http.StatusOK,
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
// will return emppty paths
|
||||
|
||||
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
|
||||
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
|
||||
|
9
extern/sector-storage/stores/interface.go
vendored
9
extern/sector-storage/stores/interface.go
vendored
@ -2,6 +2,7 @@ package stores
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
|
||||
@ -18,9 +19,15 @@ type partialFileHandler interface {
|
||||
// size
|
||||
OpenPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialfile.PartialFile, error)
|
||||
|
||||
// HasAllocated returns true if the given partialfile has an unsealed piece starting at the given offset with the given size.
|
||||
// HasAllocated returns true if the given partial file has an unsealed piece starting at the given offset with the given size.
|
||||
// returns false otherwise.
|
||||
HasAllocated(pf *partialfile.PartialFile, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
|
||||
|
||||
// Reader returns a file from which we can read the unsealed piece in the partial file.
|
||||
Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error)
|
||||
|
||||
// Close closes the partial file
|
||||
Close(pf *partialfile.PartialFile) error
|
||||
}
|
||||
|
||||
type Store interface {
|
||||
|
169
extern/sector-storage/stores/mocks/index.go
vendored
Normal file
169
extern/sector-storage/stores/mocks/index.go
vendored
Normal file
@ -0,0 +1,169 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: index.go
|
||||
|
||||
// Package mock_stores is a generated GoMock package.
|
||||
package mocks
|
||||
|
||||
import (
|
||||
context "context"
|
||||
reflect "reflect"
|
||||
|
||||
abi "github.com/filecoin-project/go-state-types/abi"
|
||||
fsutil "github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
|
||||
stores "github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||
storiface "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
)
|
||||
|
||||
// MockSectorIndex is a mock of SectorIndex interface.
|
||||
type MockSectorIndex struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockSectorIndexMockRecorder
|
||||
}
|
||||
|
||||
// MockSectorIndexMockRecorder is the mock recorder for MockSectorIndex.
|
||||
type MockSectorIndexMockRecorder struct {
|
||||
mock *MockSectorIndex
|
||||
}
|
||||
|
||||
// NewMockSectorIndex creates a new mock instance.
|
||||
func NewMockSectorIndex(ctrl *gomock.Controller) *MockSectorIndex {
|
||||
mock := &MockSectorIndex{ctrl: ctrl}
|
||||
mock.recorder = &MockSectorIndexMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockSectorIndex) EXPECT() *MockSectorIndexMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// StorageAttach mocks base method.
|
||||
func (m *MockSectorIndex) StorageAttach(arg0 context.Context, arg1 stores.StorageInfo, arg2 fsutil.FsStat) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "StorageAttach", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// StorageAttach indicates an expected call of StorageAttach.
|
||||
func (mr *MockSectorIndexMockRecorder) StorageAttach(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageAttach", reflect.TypeOf((*MockSectorIndex)(nil).StorageAttach), arg0, arg1, arg2)
|
||||
}
|
||||
|
||||
// StorageBestAlloc mocks base method.
|
||||
func (m *MockSectorIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]stores.StorageInfo, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "StorageBestAlloc", ctx, allocate, ssize, pathType)
|
||||
ret0, _ := ret[0].([]stores.StorageInfo)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// StorageBestAlloc indicates an expected call of StorageBestAlloc.
|
||||
func (mr *MockSectorIndexMockRecorder) StorageBestAlloc(ctx, allocate, ssize, pathType interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageBestAlloc", reflect.TypeOf((*MockSectorIndex)(nil).StorageBestAlloc), ctx, allocate, ssize, pathType)
|
||||
}
|
||||
|
||||
// StorageDeclareSector mocks base method.
|
||||
func (m *MockSectorIndex) StorageDeclareSector(ctx context.Context, storageID stores.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "StorageDeclareSector", ctx, storageID, s, ft, primary)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// StorageDeclareSector indicates an expected call of StorageDeclareSector.
|
||||
func (mr *MockSectorIndexMockRecorder) StorageDeclareSector(ctx, storageID, s, ft, primary interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageDeclareSector", reflect.TypeOf((*MockSectorIndex)(nil).StorageDeclareSector), ctx, storageID, s, ft, primary)
|
||||
}
|
||||
|
||||
// StorageDropSector mocks base method.
|
||||
func (m *MockSectorIndex) StorageDropSector(ctx context.Context, storageID stores.ID, s abi.SectorID, ft storiface.SectorFileType) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "StorageDropSector", ctx, storageID, s, ft)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// StorageDropSector indicates an expected call of StorageDropSector.
|
||||
func (mr *MockSectorIndexMockRecorder) StorageDropSector(ctx, storageID, s, ft interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageDropSector", reflect.TypeOf((*MockSectorIndex)(nil).StorageDropSector), ctx, storageID, s, ft)
|
||||
}
|
||||
|
||||
// StorageFindSector mocks base method.
|
||||
func (m *MockSectorIndex) StorageFindSector(ctx context.Context, sector abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]stores.SectorStorageInfo, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "StorageFindSector", ctx, sector, ft, ssize, allowFetch)
|
||||
ret0, _ := ret[0].([]stores.SectorStorageInfo)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// StorageFindSector indicates an expected call of StorageFindSector.
|
||||
func (mr *MockSectorIndexMockRecorder) StorageFindSector(ctx, sector, ft, ssize, allowFetch interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageFindSector", reflect.TypeOf((*MockSectorIndex)(nil).StorageFindSector), ctx, sector, ft, ssize, allowFetch)
|
||||
}
|
||||
|
||||
// StorageInfo mocks base method.
|
||||
func (m *MockSectorIndex) StorageInfo(arg0 context.Context, arg1 stores.ID) (stores.StorageInfo, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "StorageInfo", arg0, arg1)
|
||||
ret0, _ := ret[0].(stores.StorageInfo)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// StorageInfo indicates an expected call of StorageInfo.
|
||||
func (mr *MockSectorIndexMockRecorder) StorageInfo(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageInfo", reflect.TypeOf((*MockSectorIndex)(nil).StorageInfo), arg0, arg1)
|
||||
}
|
||||
|
||||
// StorageLock mocks base method.
|
||||
func (m *MockSectorIndex) StorageLock(ctx context.Context, sector abi.SectorID, read, write storiface.SectorFileType) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "StorageLock", ctx, sector, read, write)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// StorageLock indicates an expected call of StorageLock.
|
||||
func (mr *MockSectorIndexMockRecorder) StorageLock(ctx, sector, read, write interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageLock", reflect.TypeOf((*MockSectorIndex)(nil).StorageLock), ctx, sector, read, write)
|
||||
}
|
||||
|
||||
// StorageReportHealth mocks base method.
|
||||
func (m *MockSectorIndex) StorageReportHealth(arg0 context.Context, arg1 stores.ID, arg2 stores.HealthReport) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "StorageReportHealth", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// StorageReportHealth indicates an expected call of StorageReportHealth.
|
||||
func (mr *MockSectorIndexMockRecorder) StorageReportHealth(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageReportHealth", reflect.TypeOf((*MockSectorIndex)(nil).StorageReportHealth), arg0, arg1, arg2)
|
||||
}
|
||||
|
||||
// StorageTryLock mocks base method.
|
||||
func (m *MockSectorIndex) StorageTryLock(ctx context.Context, sector abi.SectorID, read, write storiface.SectorFileType) (bool, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "StorageTryLock", ctx, sector, read, write)
|
||||
ret0, _ := ret[0].(bool)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// StorageTryLock indicates an expected call of StorageTryLock.
|
||||
func (mr *MockSectorIndexMockRecorder) StorageTryLock(ctx, sector, read, write interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageTryLock", reflect.TypeOf((*MockSectorIndex)(nil).StorageTryLock), ctx, sector, read, write)
|
||||
}
|
30
extern/sector-storage/stores/mocks/stores.go
vendored
30
extern/sector-storage/stores/mocks/stores.go
vendored
@ -6,6 +6,7 @@ package mocks
|
||||
|
||||
import (
|
||||
context "context"
|
||||
os "os"
|
||||
reflect "reflect"
|
||||
|
||||
abi "github.com/filecoin-project/go-state-types/abi"
|
||||
@ -40,6 +41,20 @@ func (m *MockpartialFileHandler) EXPECT() *MockpartialFileHandlerMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// Close mocks base method.
|
||||
func (m *MockpartialFileHandler) Close(pf *partialfile.PartialFile) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Close", pf)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Close indicates an expected call of Close.
|
||||
func (mr *MockpartialFileHandlerMockRecorder) Close(pf interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockpartialFileHandler)(nil).Close), pf)
|
||||
}
|
||||
|
||||
// HasAllocated mocks base method.
|
||||
func (m *MockpartialFileHandler) HasAllocated(pf *partialfile.PartialFile, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
|
||||
m.ctrl.T.Helper()
|
||||
@ -70,6 +85,21 @@ func (mr *MockpartialFileHandlerMockRecorder) OpenPartialFile(maxPieceSize, path
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenPartialFile", reflect.TypeOf((*MockpartialFileHandler)(nil).OpenPartialFile), maxPieceSize, path)
|
||||
}
|
||||
|
||||
// Reader mocks base method.
|
||||
func (m *MockpartialFileHandler) Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Reader", pf, offset, size)
|
||||
ret0, _ := ret[0].(*os.File)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Reader indicates an expected call of Reader.
|
||||
func (mr *MockpartialFileHandlerMockRecorder) Reader(pf, offset, size interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reader", reflect.TypeOf((*MockpartialFileHandler)(nil).Reader), pf, offset, size)
|
||||
}
|
||||
|
||||
// MockStore is a mock of Store interface.
|
||||
type MockStore struct {
|
||||
ctrl *gomock.Controller
|
||||
|
26
extern/sector-storage/stores/remote.go
vendored
26
extern/sector-storage/stores/remote.go
vendored
@ -17,7 +17,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/tarutil"
|
||||
|
||||
@ -33,7 +32,7 @@ var FetchTempSubdir = "fetching"
|
||||
var CopyBuf = 1 << 20
|
||||
|
||||
type Remote struct {
|
||||
local *Local
|
||||
local Store
|
||||
index SectorIndex
|
||||
auth http.Header
|
||||
|
||||
@ -41,6 +40,8 @@ type Remote struct {
|
||||
|
||||
fetchLk sync.Mutex
|
||||
fetching map[abi.SectorID]chan struct{}
|
||||
|
||||
pfHandler partialFileHandler
|
||||
}
|
||||
|
||||
func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types storiface.SectorFileType) error {
|
||||
@ -51,7 +52,7 @@ func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types storifa
|
||||
return r.local.RemoveCopies(ctx, s, types)
|
||||
}
|
||||
|
||||
func NewRemote(local *Local, index SectorIndex, auth http.Header, fetchLimit int) *Remote {
|
||||
func NewRemote(local Store, index SectorIndex, auth http.Header, fetchLimit int, pfHandler partialFileHandler) *Remote {
|
||||
return &Remote{
|
||||
local: local,
|
||||
index: index,
|
||||
@ -59,7 +60,8 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header, fetchLimit int
|
||||
|
||||
limit: make(chan struct{}, fetchLimit),
|
||||
|
||||
fetching: map[abi.SectorID]chan struct{}{},
|
||||
fetching: map[abi.SectorID]chan struct{}{},
|
||||
pfHandler: pfHandler,
|
||||
}
|
||||
}
|
||||
|
||||
@ -462,7 +464,10 @@ func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.Pa
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("request: %w", err)
|
||||
}
|
||||
req.Header = r.auth.Clone()
|
||||
|
||||
if r.auth != nil {
|
||||
req.Header = r.auth.Clone()
|
||||
}
|
||||
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+size-1))
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
@ -509,27 +514,27 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
|
||||
}
|
||||
|
||||
// open the unsealed sector file for the given sector size located at the given path.
|
||||
pf, err := partialfile.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
|
||||
pf, err := r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("opening partial file: %w", err)
|
||||
}
|
||||
|
||||
// even though we have an unsealed file for the given sector, we still need to determine if we have the unsealed piece
|
||||
// in the unsealed sector file. That is what `HasAllocated` checks for.
|
||||
has, err := pf.HasAllocated(storiface.UnpaddedByteIndex(offset.Unpadded()), size.Unpadded())
|
||||
has, err := r.pfHandler.HasAllocated(pf, storiface.UnpaddedByteIndex(offset.Unpadded()), size.Unpadded())
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("has allocated: %w", err)
|
||||
}
|
||||
|
||||
if !has {
|
||||
if err := pf.Close(); err != nil {
|
||||
if err := r.pfHandler.Close(pf); err != nil {
|
||||
return nil, xerrors.Errorf("close partial file: %w", err)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
log.Debugf("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size)
|
||||
return pf.Reader(storiface.PaddedByteIndex(offset), size)
|
||||
return r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset), size)
|
||||
}
|
||||
|
||||
// --- We don't have the unsealed sector file locally
|
||||
@ -546,9 +551,8 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
|
||||
return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound)
|
||||
}
|
||||
|
||||
// TODO Why are we sorting in ascending order here -> shouldn't we sort in descending order as higher weight means more likely to have the file ?
|
||||
sort.Slice(si, func(i, j int) bool {
|
||||
return si[i].Weight < si[j].Weight
|
||||
return si[i].Weight > si[j].Weight
|
||||
})
|
||||
|
||||
for _, info := range si {
|
||||
|
418
extern/sector-storage/stores/remote_test.go
vendored
Normal file
418
extern/sector-storage/stores/remote_test.go
vendored
Normal file
@ -0,0 +1,418 @@
|
||||
package stores_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores/mocks"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/gorilla/mux"
|
||||
logging "github.com/ipfs/go-log"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
func TestReader(t *testing.T) {
|
||||
logging.SetAllLoggers(logging.LevelDebug)
|
||||
bz := []byte("Hello World")
|
||||
|
||||
pfPath := "path"
|
||||
ft := storiface.FTUnsealed
|
||||
emptyPartialFile := &partialfile.PartialFile{}
|
||||
|
||||
sectorRef := storage.SectorRef{
|
||||
ID: abi.SectorID{
|
||||
Miner: 123,
|
||||
Number: 123,
|
||||
},
|
||||
ProofType: 1,
|
||||
}
|
||||
sectorSize := abi.SealProofInfos[1].SectorSize
|
||||
|
||||
offset := abi.PaddedPieceSize(100)
|
||||
size := abi.PaddedPieceSize(1000)
|
||||
ctx := context.Background()
|
||||
|
||||
tcs := map[string]struct {
|
||||
storeFnc func(s *mocks.MockStore)
|
||||
pfFunc func(s *mocks.MockpartialFileHandler)
|
||||
indexFnc func(s *mocks.MockSectorIndex, serverURL string)
|
||||
|
||||
needHttpServer bool
|
||||
|
||||
getAllocatedReturnCode int
|
||||
getSectorReturnCode int
|
||||
|
||||
serverUrl string
|
||||
|
||||
// expectation
|
||||
errStr string
|
||||
expectedNonNilReader bool
|
||||
expectedSectorBytes []byte
|
||||
}{
|
||||
|
||||
// -------- have the unsealed file locally
|
||||
"fails when error while acquiring unsealed file": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, xerrors.New("acquire error"))
|
||||
},
|
||||
|
||||
errStr: "acquire error",
|
||||
},
|
||||
|
||||
"fails when error while opening local partial (unsealed) file": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockpartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, xerrors.New("pf open error"))
|
||||
},
|
||||
errStr: "pf open error",
|
||||
},
|
||||
|
||||
"fails when error while checking if local unsealed file has piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockpartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
true, xerrors.New("piece check error"))
|
||||
},
|
||||
|
||||
errStr: "piece check error",
|
||||
},
|
||||
|
||||
"fails when error while closing local unsealed file that does not have the piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockpartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
false, nil)
|
||||
pf.EXPECT().Close(emptyPartialFile).Return(xerrors.New("close error")).Times(1)
|
||||
},
|
||||
errStr: "close error",
|
||||
},
|
||||
|
||||
"fails when error while fetching reader for the local unsealed file that has the unsealed piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockpartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
true, nil)
|
||||
mockPfReader(pf, emptyPartialFile, offset, size, nil, xerrors.New("reader error"))
|
||||
|
||||
},
|
||||
errStr: "reader error",
|
||||
},
|
||||
|
||||
// ------------------- don't have the unsealed file locally
|
||||
|
||||
"fails when error while finding sector": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, _ string) {
|
||||
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
|
||||
false).Return(nil, xerrors.New("find sector error"))
|
||||
},
|
||||
errStr: "find sector error",
|
||||
},
|
||||
|
||||
"fails when no worker has unsealed file": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, _ string) {
|
||||
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
|
||||
false).Return(nil, nil)
|
||||
},
|
||||
errStr: storiface.ErrSectorNotFound.Error(),
|
||||
},
|
||||
|
||||
// --- nil reader when local unsealed file does NOT have unsealed piece
|
||||
"nil reader when local unsealed file does not have the piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockpartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
false, nil)
|
||||
|
||||
pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1)
|
||||
},
|
||||
},
|
||||
|
||||
// ---- nil reader when none of the remote unsealed file has unsealed piece
|
||||
"nil reader when none of the worker has the unsealed piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, url string) {
|
||||
si := stores.SectorStorageInfo{
|
||||
URLs: []string{url},
|
||||
}
|
||||
|
||||
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
|
||||
false).Return([]stores.SectorStorageInfo{si}, nil).Times(1)
|
||||
},
|
||||
|
||||
needHttpServer: true,
|
||||
getAllocatedReturnCode: 500,
|
||||
},
|
||||
|
||||
"nil reader when none of the worker is able to serve the unsealed piece even though they have it": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, url string) {
|
||||
si := stores.SectorStorageInfo{
|
||||
URLs: []string{url},
|
||||
}
|
||||
|
||||
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
|
||||
false).Return([]stores.SectorStorageInfo{si}, nil).Times(1)
|
||||
},
|
||||
|
||||
needHttpServer: true,
|
||||
getSectorReturnCode: 500,
|
||||
getAllocatedReturnCode: 200,
|
||||
},
|
||||
|
||||
// ---- Success for local unsealed file
|
||||
"successfully fetches reader for piece from local unsealed file": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockpartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
true, nil)
|
||||
|
||||
f, err := ioutil.TempFile("", "TestReader-")
|
||||
require.NoError(t, err)
|
||||
_, err = f.Write(bz)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, f.Close())
|
||||
f, err = os.Open(f.Name())
|
||||
require.NoError(t, err)
|
||||
|
||||
mockPfReader(pf, emptyPartialFile, offset, size, f, nil)
|
||||
|
||||
},
|
||||
|
||||
expectedNonNilReader: true,
|
||||
expectedSectorBytes: bz,
|
||||
},
|
||||
|
||||
// --- Success for remote unsealed file
|
||||
"successfully fetches reader for piece from remote unsealed piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, url string) {
|
||||
si := stores.SectorStorageInfo{
|
||||
URLs: []string{url},
|
||||
}
|
||||
|
||||
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
|
||||
false).Return([]stores.SectorStorageInfo{si}, nil).Times(1)
|
||||
},
|
||||
|
||||
needHttpServer: true,
|
||||
getSectorReturnCode: 200,
|
||||
getAllocatedReturnCode: 200,
|
||||
expectedSectorBytes: bz,
|
||||
expectedNonNilReader: true,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range tcs {
|
||||
tc := tc
|
||||
t.Run(name, func(t *testing.T) {
|
||||
// create go mock controller here
|
||||
mockCtrl := gomock.NewController(t)
|
||||
// when test is done, assert expectations on all mock objects.
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
// create them mocks
|
||||
lstore := mocks.NewMockStore(mockCtrl)
|
||||
pfhandler := mocks.NewMockpartialFileHandler(mockCtrl)
|
||||
index := mocks.NewMockSectorIndex(mockCtrl)
|
||||
|
||||
if tc.storeFnc != nil {
|
||||
tc.storeFnc(lstore)
|
||||
}
|
||||
if tc.pfFunc != nil {
|
||||
tc.pfFunc(pfhandler)
|
||||
}
|
||||
|
||||
if tc.needHttpServer {
|
||||
// run http server
|
||||
ts := httptest.NewServer(&mockHttpServer{
|
||||
expectedSectorName: storiface.SectorName(sectorRef.ID),
|
||||
expectedFileType: ft.String(),
|
||||
expectedOffset: fmt.Sprintf("%d", offset.Unpadded()),
|
||||
expectedSize: fmt.Sprintf("%d", size.Unpadded()),
|
||||
expectedSectorType: fmt.Sprintf("%d", sectorRef.ProofType),
|
||||
|
||||
getAllocatedReturnCode: tc.getAllocatedReturnCode,
|
||||
getSectorReturnCode: tc.getSectorReturnCode,
|
||||
getSectorBytes: tc.expectedSectorBytes,
|
||||
})
|
||||
defer ts.Close()
|
||||
tc.serverUrl = fmt.Sprintf("%s/remote/%s/%s", ts.URL, ft.String(), storiface.SectorName(sectorRef.ID))
|
||||
}
|
||||
if tc.indexFnc != nil {
|
||||
tc.indexFnc(index, tc.serverUrl)
|
||||
}
|
||||
|
||||
remoteStore := stores.NewRemote(lstore, index, nil, 6000, pfhandler)
|
||||
|
||||
rd, err := remoteStore.Reader(ctx, sectorRef, offset, size)
|
||||
|
||||
if tc.errStr != "" {
|
||||
require.Error(t, err)
|
||||
require.Nil(t, rd)
|
||||
require.Contains(t, err.Error(), tc.errStr)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
if !tc.expectedNonNilReader {
|
||||
require.Nil(t, rd)
|
||||
} else {
|
||||
require.NotNil(t, rd)
|
||||
defer func() {
|
||||
require.NoError(t, rd.Close())
|
||||
}()
|
||||
|
||||
if f, ok := rd.(*os.File); ok {
|
||||
require.NoError(t, os.Remove(f.Name()))
|
||||
}
|
||||
|
||||
bz, err := ioutil.ReadAll(rd)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expectedSectorBytes, bz)
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func mockSectorAcquire(l *mocks.MockStore, sectorRef storage.SectorRef, pfPath string, err error) {
|
||||
l.EXPECT().AcquireSector(gomock.Any(), sectorRef, storiface.FTUnsealed,
|
||||
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
|
||||
Unsealed: pfPath,
|
||||
},
|
||||
storiface.SectorPaths{}, err).Times(1)
|
||||
}
|
||||
|
||||
func mockPartialFileOpen(pf *mocks.MockpartialFileHandler, sectorSize abi.SectorSize, pfPath string, err error) {
|
||||
pf.EXPECT().OpenPartialFile(abi.PaddedPieceSize(sectorSize), pfPath).Return(&partialfile.PartialFile{},
|
||||
err).Times(1)
|
||||
}
|
||||
|
||||
func mockCheckAllocation(pf *mocks.MockpartialFileHandler, offset, size abi.PaddedPieceSize, file *partialfile.PartialFile,
|
||||
out bool, err error) {
|
||||
pf.EXPECT().HasAllocated(file, storiface.UnpaddedByteIndex(offset.Unpadded()),
|
||||
size.Unpadded()).Return(out, err).Times(1)
|
||||
}
|
||||
|
||||
func mockPfReader(pf *mocks.MockpartialFileHandler, file *partialfile.PartialFile, offset, size abi.PaddedPieceSize,
|
||||
outFile *os.File, err error) {
|
||||
pf.EXPECT().Reader(file, storiface.PaddedByteIndex(offset), size).Return(outFile, err)
|
||||
}
|
||||
|
||||
type mockHttpServer struct {
|
||||
expectedSectorName string
|
||||
expectedFileType string
|
||||
expectedOffset string
|
||||
expectedSize string
|
||||
expectedSectorType string
|
||||
|
||||
getAllocatedReturnCode int
|
||||
getSectorReturnCode int
|
||||
getSectorBytes []byte
|
||||
}
|
||||
|
||||
func (m *mockHttpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
mux := mux.NewRouter()
|
||||
mux.HandleFunc("/remote/{type}/{id}", m.getSector).Methods("GET")
|
||||
mux.HandleFunc("/remote/{type}/{id}/{spt}/allocated/{offset}/{size}", m.getAllocated).Methods("GET")
|
||||
mux.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
func (m *mockHttpServer) getAllocated(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
|
||||
if vars["id"] != m.expectedSectorName {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if vars["type"] != m.expectedFileType {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if vars["spt"] != m.expectedSectorType {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if vars["offset"] != m.expectedOffset {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if vars["size"] != m.expectedSize {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(m.getAllocatedReturnCode)
|
||||
}
|
||||
|
||||
func (m *mockHttpServer) getSector(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
|
||||
if vars["id"] != m.expectedSectorName {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if vars["type"] != m.expectedFileType {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(m.getSectorReturnCode)
|
||||
_, _ = w.Write(m.getSectorBytes)
|
||||
}
|
@ -665,7 +665,7 @@ func LocalStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStora
|
||||
}
|
||||
|
||||
func RemoteStorage(lstor *stores.Local, si stores.SectorIndex, sa sectorstorage.StorageAuth, sc sectorstorage.SealerConfig) *stores.Remote {
|
||||
return stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit)
|
||||
return stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit, &stores.DefaultPartialFileHandler{})
|
||||
}
|
||||
|
||||
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.SealerConfig, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) {
|
||||
|
Loading…
Reference in New Issue
Block a user