Merge pull request #6280 from filecoin-project/feat/pieceread-outside-scheduler

Bypass task scheduler for reading unsealed pieces
This commit is contained in:
Łukasz Magiera 2021-06-07 18:11:44 +02:00 committed by GitHub
commit fadc79a487
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 3142 additions and 1138 deletions

View File

@ -331,7 +331,7 @@ jobs:
- run: cd extern/filecoin-ffi && make
- run:
name: "go get lotus@master"
command: cd testplans/lotus-soup && go mod edit -replace=github.com/filecoin-project/lotus=../..
command: cd testplans/lotus-soup && go mod edit -replace=github.com/filecoin-project/lotus=../.. && go mod tidy
- run:
name: "build lotus-soup testplan"
command: pushd testplans/lotus-soup && go build -tags=testground .

View File

@ -2,7 +2,6 @@ package api
import (
"context"
"io"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
@ -43,7 +42,6 @@ type Worker interface {
ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) //perm:admin
MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) //perm:admin
UnsealPiece(context.Context, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) //perm:admin
ReadPiece(context.Context, io.Writer, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (storiface.CallID, error) //perm:admin
Fetch(context.Context, storage.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) //perm:admin
TaskDisable(ctx context.Context, tt sealtasks.TaskType) error //perm:admin

File diff suppressed because it is too large Load Diff

View File

@ -4,7 +4,6 @@ package api
import (
"context"
"io"
"time"
"github.com/filecoin-project/go-address"
@ -781,8 +780,6 @@ type WorkerStruct struct {
ProcessSession func(p0 context.Context) (uuid.UUID, error) `perm:"admin"`
ReadPiece func(p0 context.Context, p1 io.Writer, p2 storage.SectorRef, p3 storiface.UnpaddedByteIndex, p4 abi.UnpaddedPieceSize) (storiface.CallID, error) `perm:"admin"`
ReleaseUnsealed func(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) `perm:"admin"`
Remove func(p0 context.Context, p1 abi.SectorID) error `perm:"admin"`
@ -3554,14 +3551,6 @@ func (s *WorkerStub) ProcessSession(p0 context.Context) (uuid.UUID, error) {
return *new(uuid.UUID), xerrors.New("method not supported")
}
func (s *WorkerStruct) ReadPiece(p0 context.Context, p1 io.Writer, p2 storage.SectorRef, p3 storiface.UnpaddedByteIndex, p4 abi.UnpaddedPieceSize) (storiface.CallID, error) {
return s.Internal.ReadPiece(p0, p1, p2, p3, p4)
}
func (s *WorkerStub) ReadPiece(p0 context.Context, p1 io.Writer, p2 storage.SectorRef, p3 storiface.UnpaddedByteIndex, p4 abi.UnpaddedPieceSize) (storiface.CallID, error) {
return *new(storiface.CallID), xerrors.New("method not supported")
}
func (s *WorkerStruct) ReleaseUnsealed(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) {
return s.Internal.ReleaseUnsealed(p0, p1, p2)
}

View File

@ -441,7 +441,7 @@ func TestOfflineDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, st
require.Eventually(t, func() bool {
cd, _ := s.client.ClientGetDealInfo(s.ctx, *proposalCid)
return cd.State == storagemarket.StorageDealCheckForAcceptance
}, 1*time.Second, 100*time.Millisecond, "actual deal status is %s", storagemarket.DealStates[cd.State])
}, 30*time.Second, 1*time.Second, "actual deal status is %s", storagemarket.DealStates[cd.State])
// Create a CAR file from the raw file
carFileDir, err := ioutil.TempDir(os.TempDir(), "test-make-deal-car")

File diff suppressed because it is too large Load Diff

View File

@ -57,8 +57,8 @@ var (
FullAPIVersion0 = newVer(1, 3, 0)
FullAPIVersion1 = newVer(2, 1, 0)
MinerAPIVersion0 = newVer(1, 0, 1)
WorkerAPIVersion0 = newVer(1, 0, 0)
MinerAPIVersion0 = newVer(1, 1, 0)
WorkerAPIVersion0 = newVer(1, 1, 0)
)
//nolint:varcheck,deadcode

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -6,39 +6,40 @@ package cli
import (
context "context"
reflect "reflect"
go_address "github.com/filecoin-project/go-address"
abi "github.com/filecoin-project/go-state-types/abi"
big "github.com/filecoin-project/go-state-types/big"
api "github.com/filecoin-project/lotus/api"
types "github.com/filecoin-project/lotus/chain/types"
gomock "github.com/golang/mock/gomock"
reflect "reflect"
)
// MockServicesAPI is a mock of ServicesAPI interface
// MockServicesAPI is a mock of ServicesAPI interface.
type MockServicesAPI struct {
ctrl *gomock.Controller
recorder *MockServicesAPIMockRecorder
}
// MockServicesAPIMockRecorder is the mock recorder for MockServicesAPI
// MockServicesAPIMockRecorder is the mock recorder for MockServicesAPI.
type MockServicesAPIMockRecorder struct {
mock *MockServicesAPI
}
// NewMockServicesAPI creates a new mock instance
// NewMockServicesAPI creates a new mock instance.
func NewMockServicesAPI(ctrl *gomock.Controller) *MockServicesAPI {
mock := &MockServicesAPI{ctrl: ctrl}
mock.recorder = &MockServicesAPIMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockServicesAPI) EXPECT() *MockServicesAPIMockRecorder {
return m.recorder
}
// Close mocks base method
// Close mocks base method.
func (m *MockServicesAPI) Close() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close")
@ -46,13 +47,13 @@ func (m *MockServicesAPI) Close() error {
return ret0
}
// Close indicates an expected call of Close
// Close indicates an expected call of Close.
func (mr *MockServicesAPIMockRecorder) Close() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockServicesAPI)(nil).Close))
}
// DecodeTypedParamsFromJSON mocks base method
// DecodeTypedParamsFromJSON mocks base method.
func (m *MockServicesAPI) DecodeTypedParamsFromJSON(arg0 context.Context, arg1 go_address.Address, arg2 abi.MethodNum, arg3 string) ([]byte, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DecodeTypedParamsFromJSON", arg0, arg1, arg2, arg3)
@ -61,13 +62,13 @@ func (m *MockServicesAPI) DecodeTypedParamsFromJSON(arg0 context.Context, arg1 g
return ret0, ret1
}
// DecodeTypedParamsFromJSON indicates an expected call of DecodeTypedParamsFromJSON
// DecodeTypedParamsFromJSON indicates an expected call of DecodeTypedParamsFromJSON.
func (mr *MockServicesAPIMockRecorder) DecodeTypedParamsFromJSON(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DecodeTypedParamsFromJSON", reflect.TypeOf((*MockServicesAPI)(nil).DecodeTypedParamsFromJSON), arg0, arg1, arg2, arg3)
}
// FullNodeAPI mocks base method
// FullNodeAPI mocks base method.
func (m *MockServicesAPI) FullNodeAPI() api.FullNode {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FullNodeAPI")
@ -75,13 +76,13 @@ func (m *MockServicesAPI) FullNodeAPI() api.FullNode {
return ret0
}
// FullNodeAPI indicates an expected call of FullNodeAPI
// FullNodeAPI indicates an expected call of FullNodeAPI.
func (mr *MockServicesAPIMockRecorder) FullNodeAPI() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FullNodeAPI", reflect.TypeOf((*MockServicesAPI)(nil).FullNodeAPI))
}
// GetBaseFee mocks base method
// GetBaseFee mocks base method.
func (m *MockServicesAPI) GetBaseFee(arg0 context.Context) (big.Int, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetBaseFee", arg0)
@ -90,13 +91,13 @@ func (m *MockServicesAPI) GetBaseFee(arg0 context.Context) (big.Int, error) {
return ret0, ret1
}
// GetBaseFee indicates an expected call of GetBaseFee
// GetBaseFee indicates an expected call of GetBaseFee.
func (mr *MockServicesAPIMockRecorder) GetBaseFee(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBaseFee", reflect.TypeOf((*MockServicesAPI)(nil).GetBaseFee), arg0)
}
// LocalAddresses mocks base method
// LocalAddresses mocks base method.
func (m *MockServicesAPI) LocalAddresses(arg0 context.Context) (go_address.Address, []go_address.Address, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "LocalAddresses", arg0)
@ -106,13 +107,13 @@ func (m *MockServicesAPI) LocalAddresses(arg0 context.Context) (go_address.Addre
return ret0, ret1, ret2
}
// LocalAddresses indicates an expected call of LocalAddresses
// LocalAddresses indicates an expected call of LocalAddresses.
func (mr *MockServicesAPIMockRecorder) LocalAddresses(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LocalAddresses", reflect.TypeOf((*MockServicesAPI)(nil).LocalAddresses), arg0)
}
// MessageForSend mocks base method
// MessageForSend mocks base method.
func (m *MockServicesAPI) MessageForSend(arg0 context.Context, arg1 SendParams) (*api.MessagePrototype, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "MessageForSend", arg0, arg1)
@ -121,13 +122,13 @@ func (m *MockServicesAPI) MessageForSend(arg0 context.Context, arg1 SendParams)
return ret0, ret1
}
// MessageForSend indicates an expected call of MessageForSend
// MessageForSend indicates an expected call of MessageForSend.
func (mr *MockServicesAPIMockRecorder) MessageForSend(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MessageForSend", reflect.TypeOf((*MockServicesAPI)(nil).MessageForSend), arg0, arg1)
}
// MpoolCheckPendingMessages mocks base method
// MpoolCheckPendingMessages mocks base method.
func (m *MockServicesAPI) MpoolCheckPendingMessages(arg0 context.Context, arg1 go_address.Address) ([][]api.MessageCheckStatus, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "MpoolCheckPendingMessages", arg0, arg1)
@ -136,13 +137,13 @@ func (m *MockServicesAPI) MpoolCheckPendingMessages(arg0 context.Context, arg1 g
return ret0, ret1
}
// MpoolCheckPendingMessages indicates an expected call of MpoolCheckPendingMessages
// MpoolCheckPendingMessages indicates an expected call of MpoolCheckPendingMessages.
func (mr *MockServicesAPIMockRecorder) MpoolCheckPendingMessages(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolCheckPendingMessages", reflect.TypeOf((*MockServicesAPI)(nil).MpoolCheckPendingMessages), arg0, arg1)
}
// MpoolPendingFilter mocks base method
// MpoolPendingFilter mocks base method.
func (m *MockServicesAPI) MpoolPendingFilter(arg0 context.Context, arg1 func(*types.SignedMessage) bool, arg2 types.TipSetKey) ([]*types.SignedMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "MpoolPendingFilter", arg0, arg1, arg2)
@ -151,13 +152,13 @@ func (m *MockServicesAPI) MpoolPendingFilter(arg0 context.Context, arg1 func(*ty
return ret0, ret1
}
// MpoolPendingFilter indicates an expected call of MpoolPendingFilter
// MpoolPendingFilter indicates an expected call of MpoolPendingFilter.
func (mr *MockServicesAPIMockRecorder) MpoolPendingFilter(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolPendingFilter", reflect.TypeOf((*MockServicesAPI)(nil).MpoolPendingFilter), arg0, arg1, arg2)
}
// PublishMessage mocks base method
// PublishMessage mocks base method.
func (m *MockServicesAPI) PublishMessage(arg0 context.Context, arg1 *api.MessagePrototype, arg2 bool) (*types.SignedMessage, [][]api.MessageCheckStatus, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PublishMessage", arg0, arg1, arg2)
@ -167,13 +168,13 @@ func (m *MockServicesAPI) PublishMessage(arg0 context.Context, arg1 *api.Message
return ret0, ret1, ret2
}
// PublishMessage indicates an expected call of PublishMessage
// PublishMessage indicates an expected call of PublishMessage.
func (mr *MockServicesAPIMockRecorder) PublishMessage(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishMessage", reflect.TypeOf((*MockServicesAPI)(nil).PublishMessage), arg0, arg1, arg2)
}
// RunChecksForPrototype mocks base method
// RunChecksForPrototype mocks base method.
func (m *MockServicesAPI) RunChecksForPrototype(arg0 context.Context, arg1 *api.MessagePrototype) ([][]api.MessageCheckStatus, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RunChecksForPrototype", arg0, arg1)
@ -182,7 +183,7 @@ func (m *MockServicesAPI) RunChecksForPrototype(arg0 context.Context, arg1 *api.
return ret0, ret1
}
// RunChecksForPrototype indicates an expected call of RunChecksForPrototype
// RunChecksForPrototype indicates an expected call of RunChecksForPrototype.
func (mr *MockServicesAPIMockRecorder) RunChecksForPrototype(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunChecksForPrototype", reflect.TypeOf((*MockServicesAPI)(nil).RunChecksForPrototype), arg0, arg1)

View File

@ -362,9 +362,10 @@ var runCmd = &cli.Command{
return xerrors.Errorf("could not get api info: %w", err)
}
remote := stores.NewRemote(localStore, nodeApi, sminfo.AuthHeader(), cctx.Int("parallel-fetch-limit"))
remote := stores.NewRemote(localStore, nodeApi, sminfo.AuthHeader(), cctx.Int("parallel-fetch-limit"),
&stores.DefaultPartialFileHandler{})
fh := &stores.FetchHandler{Local: localStore}
fh := &stores.FetchHandler{Local: localStore, PfHandler: &stores.DefaultPartialFileHandler{}}
remoteHandler := func(w http.ResponseWriter, r *http.Request) {
if !auth.HasPerm(r.Context(), nil, api.PermAdmin) {
w.WriteHeader(401)

View File

@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"strconv"
@ -453,14 +454,23 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix))
smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix))
smgr, err := sectorstorage.New(ctx, lr, stores.NewIndex(), sectorstorage.SealerConfig{
si := stores.NewIndex()
lstor, err := stores.NewLocal(ctx, lr, si, nil)
if err != nil {
return err
}
stor := stores.NewRemote(lstor, si, http.Header(sa), 10, &stores.DefaultPartialFileHandler{})
smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.SealerConfig{
ParallelFetchLimit: 10,
AllowAddPiece: true,
AllowPreCommit1: true,
AllowPreCommit2: true,
AllowCommit: true,
AllowUnseal: true,
}, nil, sa, wsts, smsts)
}, wsts, smsts)
if err != nil {
return err
}

View File

@ -15,8 +15,6 @@
* [MoveStorage](#MoveStorage)
* [Process](#Process)
* [ProcessSession](#ProcessSession)
* [Read](#Read)
* [ReadPiece](#ReadPiece)
* [Release](#Release)
* [ReleaseUnsealed](#ReleaseUnsealed)
* [Seal](#Seal)
@ -263,41 +261,6 @@ Inputs: `null`
Response: `"07070707-0707-0707-0707-070707070707"`
## Read
### ReadPiece
Perms: admin
Inputs:
```json
[
{},
{
"ID": {
"Miner": 1000,
"Number": 9
},
"ProofType": 8
},
1040384,
1024
]
```
Response:
```json
{
"Sector": {
"Miner": 1000,
"Number": 9
},
"ID": "07070707-0707-0707-0707-070707070707"
}
```
## Release

View File

@ -11,6 +11,7 @@ import (
"os"
"runtime"
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
@ -66,7 +67,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existi
}
var done func()
var stagedFile *partialFile
var stagedFile *partialfile.PartialFile
defer func() {
if done != nil {
@ -87,7 +88,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existi
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
}
stagedFile, err = createPartialFile(maxPieceSize, stagedPath.Unsealed)
stagedFile, err = partialfile.CreatePartialFile(maxPieceSize, stagedPath.Unsealed)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("creating unsealed sector file: %w", err)
}
@ -97,7 +98,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existi
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
}
stagedFile, err = openPartialFile(maxPieceSize, stagedPath.Unsealed)
stagedFile, err = partialfile.OpenPartialFile(maxPieceSize, stagedPath.Unsealed)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("opening unsealed sector file: %w", err)
}
@ -257,7 +258,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, off
// try finding existing
unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage)
var pf *partialFile
var pf *partialfile.PartialFile
switch {
case xerrors.Is(err, storiface.ErrSectorNotFound):
@ -267,7 +268,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, off
}
defer done()
pf, err = createPartialFile(maxPieceSize, unsealedPath.Unsealed)
pf, err = partialfile.CreatePartialFile(maxPieceSize, unsealedPath.Unsealed)
if err != nil {
return xerrors.Errorf("create unsealed file: %w", err)
}
@ -275,7 +276,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, off
case err == nil:
defer done()
pf, err = openPartialFile(maxPieceSize, unsealedPath.Unsealed)
pf, err = partialfile.OpenPartialFile(maxPieceSize, unsealedPath.Unsealed)
if err != nil {
return xerrors.Errorf("opening partial file: %w", err)
}
@ -427,7 +428,7 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector storag
}
maxPieceSize := abi.PaddedPieceSize(ssize)
pf, err := openPartialFile(maxPieceSize, path.Unsealed)
pf, err := partialfile.OpenPartialFile(maxPieceSize, path.Unsealed)
if err != nil {
if xerrors.Is(err, os.ErrNotExist) {
return false, nil
@ -589,7 +590,7 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector storage.SectorRef,
if len(keepUnsealed) > 0 {
sr := pieceRun(0, maxPieceSize)
sr := partialfile.PieceRun(0, maxPieceSize)
for _, s := range keepUnsealed {
si := &rlepluslazy.RunSliceIterator{}
@ -611,7 +612,7 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector storage.SectorRef,
}
defer done()
pf, err := openPartialFile(maxPieceSize, paths.Unsealed)
pf, err := partialfile.OpenPartialFile(maxPieceSize, paths.Unsealed)
if err == nil {
var at uint64
for sr.HasNext() {

View File

@ -1,6 +1,7 @@
package ffiwrapper
import (
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"golang.org/x/xerrors"
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
@ -17,7 +18,7 @@ const mergeGaps = 32 << 20
// TODO const expandRuns = 16 << 20 // unseal more than requested for future requests
func computeUnsealRanges(unsealed rlepluslazy.RunIterator, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (rlepluslazy.RunIterator, error) {
todo := pieceRun(offset.Padded(), size.Padded())
todo := partialfile.PieceRun(offset.Padded(), size.Padded())
todo, err := rlepluslazy.Subtract(todo, unsealed)
if err != nil {
return nil, xerrors.Errorf("compute todo-unsealed: %w", err)

View File

@ -51,13 +51,12 @@ func (r *unpadReader) Read(out []byte) (int, error) {
r.left -= uint64(todo)
n, err := r.src.Read(r.work[:todo])
n, err := io.ReadAtLeast(r.src, r.work[:todo], int(todo))
if err != nil && err != io.EOF {
return n, err
}
if n != int(todo) {
return 0, xerrors.Errorf("didn't read enough: %w", err)
if n < int(todo) {
return 0, xerrors.Errorf("didn't read enough: %d / %d, left %d, out %d", n, todo, r.left, len(out))
}
Unpad(r.work[:todo], out[:todo.Unpadded()])

View File

@ -47,8 +47,6 @@ type Worker interface {
}
type SectorManager interface {
ReadPiece(context.Context, io.Writer, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
ffiwrapper.StorageSealer
storage.Prover
storiface.WorkerReturn
@ -105,24 +103,18 @@ type StorageAuth http.Header
type WorkerStateStore *statestore.StateStore
type ManagerStateStore *statestore.StateStore
func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, urls URLs, sa StorageAuth, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
lstor, err := stores.NewLocal(ctx, ls, si, urls)
if err != nil {
return nil, err
}
func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
}
stor := stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit)
m := &Manager{
ls: ls,
storage: stor,
localStore: lstor,
remoteHnd: &stores.FetchHandler{Local: lstor},
remoteHnd: &stores.FetchHandler{Local: lstor, PfHandler: &stores.DefaultPartialFileHandler{}},
index: si,
sched: newScheduler(),
@ -141,7 +133,7 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, sc
go m.sched.runSched()
localTasks := []sealtasks.TaskType{
sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTReadUnsealed,
sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
}
if sc.AllowAddPiece {
localTasks = append(localTasks, sealtasks.TTAddPiece)
@ -206,71 +198,11 @@ func (m *Manager) schedFetch(sector storage.SectorRef, ft storiface.SectorFileTy
}
}
func (m *Manager) readPiece(sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, rok *bool) func(ctx context.Context, w Worker) error {
return func(ctx context.Context, w Worker) error {
log.Debugf("read piece data from sector %d, offset %d, size %d", sector.ID, offset, size)
r, err := m.waitSimpleCall(ctx)(w.ReadPiece(ctx, sink, sector, offset, size))
if err != nil {
return err
}
if r != nil {
*rok = r.(bool)
}
log.Debugf("completed read piece data from sector %d, offset %d, size %d: read ok? %t", sector.ID, offset, size, *rok)
return nil
}
}
func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (foundUnsealed bool, readOk bool, selector WorkerSelector, returnErr error) {
// acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx)
defer cancel()
log.Debugf("acquire read sector lock for sector %d", sector.ID)
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
returnErr = xerrors.Errorf("acquiring read sector lock: %w", err)
return
}
log.Debugf("find unsealed sector %d", sector.ID)
// passing 0 spt because we only need it when allowFetch is true
best, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
if err != nil {
returnErr = xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err)
return
}
foundUnsealed = len(best) > 0
if foundUnsealed { // append to existing
// There is unsealed sector, see if we can read from it
log.Debugf("found unsealed sector %d", sector.ID)
selector = newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false)
log.Debugf("scheduling read of unsealed sector %d", sector.ID)
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove),
m.readPiece(sink, sector, offset, size, &readOk))
if err != nil {
returnErr = xerrors.Errorf("reading piece from sealed sector: %w", err)
}
} else {
log.Debugf("did not find unsealed sector %d", sector.ID)
selector = newAllocSelector(m.index, storiface.FTUnsealed, storiface.PathSealing)
}
return
}
func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error {
log.Debugf("fetch and read piece in sector %d, offset %d, size %d", sector.ID, offset, size)
foundUnsealed, readOk, selector, err := m.tryReadUnsealedPiece(ctx, sink, sector, offset, size)
if err != nil {
return err
}
if readOk {
log.Debugf("completed read of unsealed piece in sector %d, offset %d, size %d", sector.ID, offset, size)
return nil
}
// SectorsUnsealPiece will Unseal the Sealed sector file for the given sector.
// It will schedule the Unsealing task on a worker that either already has the sealed sector files or has space in
// one of it's sealing scratch spaces to store them after fetching them from another worker.
// If the chosen worker already has the Unsealed sector file, we will NOT Unseal the sealed sector file again.
func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed *cid.Cid) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -279,22 +211,18 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage.
return xerrors.Errorf("acquiring unseal sector lock: %w", err)
}
unsealFetch := func(ctx context.Context, worker Worker) error {
// if the selected worker does NOT have the sealed files for the sector, instruct it to fetch it from a worker that has them and
// put it in the sealing scratch space.
sealFetch := func(ctx context.Context, worker Worker) error {
log.Debugf("copy sealed/cache sector data for sector %d", sector.ID)
if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil {
return xerrors.Errorf("copy sealed/cache sector data: %w", err)
}
if foundUnsealed {
log.Debugf("copy unsealed sector data for sector %d", sector.ID)
if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove)); err != nil {
return xerrors.Errorf("copy unsealed sector data: %w", err)
}
}
return nil
}
if unsealed == cid.Undef {
if unsealed == nil {
return xerrors.Errorf("cannot unseal piece (sector: %d, offset: %d size: %d) - unsealed cid is undefined", sector, offset, size)
}
@ -303,36 +231,28 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage.
return xerrors.Errorf("getting sector size: %w", err)
}
log.Debugf("schedule unseal for sector %d", sector.ID)
err = m.sched.Schedule(ctx, sector, sealtasks.TTUnseal, selector, unsealFetch, func(ctx context.Context, w Worker) error {
// selector will schedule the Unseal task on a worker that either already has the sealed sector files or has space in
// one of it's sealing scratch spaces to store them after fetching them from another worker.
selector := newExistingSelector(m.index, sector.ID, storiface.FTSealed|storiface.FTCache, true)
log.Debugf("will schedule unseal for sector %d", sector.ID)
err = m.sched.Schedule(ctx, sector, sealtasks.TTUnseal, selector, sealFetch, func(ctx context.Context, w Worker) error {
// TODO: make restartable
// NOTE: we're unsealing the whole sector here as with SDR we can't really
// unseal the sector partially. Requesting the whole sector here can
// save us some work in case another piece is requested from here
log.Debugf("unseal sector %d", sector.ID)
_, err := m.waitSimpleCall(ctx)(w.UnsealPiece(ctx, sector, 0, abi.PaddedPieceSize(ssize).Unpadded(), ticket, unsealed))
log.Debugf("calling unseal sector on worker, sectoID=%d", sector.ID)
// Note: This unseal piece call will essentially become a no-op if the worker already has an Unsealed sector file for the given sector.
_, err := m.waitSimpleCall(ctx)(w.UnsealPiece(ctx, sector, 0, abi.PaddedPieceSize(ssize).Unpadded(), ticket, *unsealed))
log.Debugf("completed unseal sector %d", sector.ID)
return err
})
if err != nil {
return err
return xerrors.Errorf("worker UnsealPiece call: %s", err)
}
selector = newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false)
log.Debugf("schedule read piece for sector %d, offset %d, size %d", sector.ID, offset, size)
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove),
m.readPiece(sink, sector, offset, size, &readOk))
if err != nil {
return xerrors.Errorf("reading piece from sealed sector: %w", err)
}
if !readOk {
return xerrors.Errorf("failed to read unsealed piece")
}
log.Debugf("completed read of piece in sector %d, offset %d, size %d", sector.ID, offset, size)
return nil
}
@ -767,4 +687,5 @@ func (m *Manager) Close(ctx context.Context) error {
return m.sched.Close(ctx)
}
var _ Unsealer = &Manager{}
var _ SectorManager = &Manager{}

View File

@ -98,7 +98,7 @@ func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Man
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
require.NoError(t, err)
stor := stores.NewRemote(lstor, si, nil, 6000)
stor := stores.NewRemote(lstor, si, nil, 6000, &stores.DefaultPartialFileHandler{})
m := &Manager{
ls: st,

View File

@ -6,6 +6,7 @@ import (
"crypto/sha256"
"fmt"
"io"
"io/ioutil"
"math/rand"
"sync"
@ -375,13 +376,12 @@ func generateFakePoSt(sectorInfo []proof5.SectorInfo, rpt func(abi.RegisteredSea
}
}
func (mgr *SectorMgr) ReadPiece(ctx context.Context, w io.Writer, sectorID storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, c cid.Cid) error {
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
if offset != 0 {
panic("implme")
}
_, err := io.CopyN(w, bytes.NewReader(mgr.pieces[mgr.sectors[sectorID.ID].pieces[0]]), int64(size))
return err
return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])), false, nil
}
func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) {
@ -492,6 +492,10 @@ func (mgr *SectorMgr) ReturnFetch(ctx context.Context, callID storiface.CallID,
panic("not supported")
}
func (mgr *SectorMgr) SectorsUnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd *cid.Cid) error {
return nil
}
func (m mockVerifProver) VerifySeal(svi proof5.SealVerifyInfo) (bool, error) {
plen, err := svi.SealProof.ProofSize()
if err != nil {

View File

@ -1,4 +1,4 @@
package ffiwrapper
package partialfile
import (
"encoding/binary"
@ -7,6 +7,7 @@ import (
"syscall"
"github.com/detailyang/go-fallocate"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
@ -16,6 +17,8 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
var log = logging.Logger("partialfile")
const veryLargeRle = 1 << 20
// Sectors can be partially unsealed. We support this by appending a small
@ -25,7 +28,7 @@ const veryLargeRle = 1 << 20
// unsealed sector files internally have this structure
// [unpadded (raw) data][rle+][4B LE length fo the rle+ field]
type partialFile struct {
type PartialFile struct {
maxPiece abi.PaddedPieceSize
path string
@ -57,7 +60,7 @@ func writeTrailer(maxPieceSize int64, w *os.File, r rlepluslazy.RunIterator) err
return w.Truncate(maxPieceSize + int64(rb) + 4)
}
func createPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialFile, error) {
func CreatePartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*PartialFile, error) {
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644) // nolint
if err != nil {
return nil, xerrors.Errorf("openning partial file '%s': %w", path, err)
@ -89,10 +92,10 @@ func createPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialF
return nil, xerrors.Errorf("close empty partial file: %w", err)
}
return openPartialFile(maxPieceSize, path)
return OpenPartialFile(maxPieceSize, path)
}
func openPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialFile, error) {
func OpenPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*PartialFile, error) {
f, err := os.OpenFile(path, os.O_RDWR, 0644) // nolint
if err != nil {
return nil, xerrors.Errorf("openning partial file '%s': %w", path, err)
@ -165,7 +168,7 @@ func openPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialFil
return nil, err
}
return &partialFile{
return &PartialFile{
maxPiece: maxPieceSize,
path: path,
allocated: rle,
@ -173,11 +176,11 @@ func openPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialFil
}, nil
}
func (pf *partialFile) Close() error {
func (pf *PartialFile) Close() error {
return pf.file.Close()
}
func (pf *partialFile) Writer(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Writer, error) {
func (pf *PartialFile) Writer(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Writer, error) {
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek piece start: %w", err)
}
@ -188,7 +191,7 @@ func (pf *partialFile) Writer(offset storiface.PaddedByteIndex, size abi.PaddedP
return nil, err
}
and, err := rlepluslazy.And(have, pieceRun(offset, size))
and, err := rlepluslazy.And(have, PieceRun(offset, size))
if err != nil {
return nil, err
}
@ -206,13 +209,13 @@ func (pf *partialFile) Writer(offset storiface.PaddedByteIndex, size abi.PaddedP
return pf.file, nil
}
func (pf *partialFile) MarkAllocated(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error {
func (pf *PartialFile) MarkAllocated(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error {
have, err := pf.allocated.RunIterator()
if err != nil {
return err
}
ored, err := rlepluslazy.Or(have, pieceRun(offset, size))
ored, err := rlepluslazy.Or(have, PieceRun(offset, size))
if err != nil {
return err
}
@ -224,7 +227,7 @@ func (pf *partialFile) MarkAllocated(offset storiface.PaddedByteIndex, size abi.
return nil
}
func (pf *partialFile) Free(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error {
func (pf *PartialFile) Free(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error {
have, err := pf.allocated.RunIterator()
if err != nil {
return err
@ -234,7 +237,7 @@ func (pf *partialFile) Free(offset storiface.PaddedByteIndex, size abi.PaddedPie
return xerrors.Errorf("deallocating: %w", err)
}
s, err := rlepluslazy.Subtract(have, pieceRun(offset, size))
s, err := rlepluslazy.Subtract(have, PieceRun(offset, size))
if err != nil {
return err
}
@ -246,7 +249,7 @@ func (pf *partialFile) Free(offset storiface.PaddedByteIndex, size abi.PaddedPie
return nil
}
func (pf *partialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
func (pf *PartialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek piece start: %w", err)
}
@ -257,7 +260,7 @@ func (pf *partialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedP
return nil, err
}
and, err := rlepluslazy.And(have, pieceRun(offset, size))
and, err := rlepluslazy.And(have, PieceRun(offset, size))
if err != nil {
return nil, err
}
@ -275,17 +278,17 @@ func (pf *partialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedP
return pf.file, nil
}
func (pf *partialFile) Allocated() (rlepluslazy.RunIterator, error) {
func (pf *PartialFile) Allocated() (rlepluslazy.RunIterator, error) {
return pf.allocated.RunIterator()
}
func (pf *partialFile) HasAllocated(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
func (pf *PartialFile) HasAllocated(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
have, err := pf.Allocated()
if err != nil {
return false, err
}
u, err := rlepluslazy.And(have, pieceRun(offset.Padded(), size.Padded()))
u, err := rlepluslazy.And(have, PieceRun(offset.Padded(), size.Padded()))
if err != nil {
return false, err
}
@ -298,7 +301,7 @@ func (pf *partialFile) HasAllocated(offset storiface.UnpaddedByteIndex, size abi
return abi.PaddedPieceSize(uc) == size.Padded(), nil
}
func pieceRun(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) rlepluslazy.RunIterator {
func PieceRun(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) rlepluslazy.RunIterator {
var runs []rlepluslazy.Run
if offset > 0 {
runs = append(runs, rlepluslazy.Run{

153
extern/sector-storage/piece_provider.go vendored Normal file
View File

@ -0,0 +1,153 @@
package sectorstorage
import (
"bufio"
"context"
"io"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/fr32"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
type Unsealer interface {
// SectorsUnsealPiece will Unseal a Sealed sector file for the given sector.
SectorsUnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd *cid.Cid) error
}
type PieceProvider interface {
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
}
type pieceProvider struct {
storage *stores.Remote
index stores.SectorIndex
uns Unsealer
}
func NewPieceProvider(storage *stores.Remote, index stores.SectorIndex, uns Unsealer) PieceProvider {
return &pieceProvider{
storage: storage,
index: index,
uns: uns,
}
}
// tryReadUnsealedPiece will try to read the unsealed piece from an existing unsealed sector file for the given sector from any worker that has it.
// It will NOT try to schedule an Unseal of a sealed sector file for the read.
//
// Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers.
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) {
// acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx)
if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
cancel()
return nil, nil, xerrors.Errorf("acquiring read sector lock: %w", err)
}
// Reader returns a reader for an unsealed piece at the given offset in the given sector.
// The returned reader will be nil if none of the workers has an unsealed sector file containing
// the unsealed piece.
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded())
if err != nil {
log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err)
cancel()
return nil, nil, err
}
if r == nil {
cancel()
}
return r, cancel, nil
}
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
// If an Unsealed sector file exists with the Piece Unsealed in it, we'll use that for the read.
// Otherwise, we will Unseal a Sealed sector file for the given sector and read the Unsealed piece from it.
// If we do NOT have an existing unsealed file containing the given piece thus causing us to schedule an Unseal,
// the returned boolean parameter will be set to true.
// If we have an existing unsealed file containing the given piece, the returned boolean will be set to false.
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
if err := offset.Valid(); err != nil {
return nil, false, xerrors.Errorf("offset is not valid: %w", err)
}
if err := size.Validate(); err != nil {
return nil, false, xerrors.Errorf("size is not a valid piece size: %w", err)
}
r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, offset, size)
log.Debugf("result of first tryReadUnsealedPiece: r=%+v, err=%s", r, err)
if xerrors.Is(err, storiface.ErrSectorNotFound) {
log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
err = nil
}
if err != nil {
log.Errorf("returning error from ReadPiece:%s", err)
return nil, false, err
}
var uns bool
if r == nil {
// a nil reader means that none of the workers has an unsealed sector file
// containing the unsealed piece.
// we now need to unseal a sealed sector file for the given sector to read the unsealed piece from it.
uns = true
commd := &unsealed
if unsealed == cid.Undef {
commd = nil
}
if err := p.uns.SectorsUnsealPiece(ctx, sector, offset, size, ticket, commd); err != nil {
log.Errorf("failed to SectorsUnsealPiece: %s", err)
return nil, false, xerrors.Errorf("unsealing piece: %w", err)
}
log.Debugf("unsealed a sector file to read the piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, offset, size)
if err != nil {
log.Errorf("failed to tryReadUnsealedPiece after SectorsUnsealPiece: %s", err)
return nil, true, xerrors.Errorf("read after unsealing: %w", err)
}
if r == nil {
log.Errorf("got no reader after unsealing piece")
return nil, true, xerrors.Errorf("got no reader after unsealing piece")
}
log.Debugf("got a reader to read unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
} else {
log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, offset=%d, size=%d", sector, offset, size)
}
upr, err := fr32.NewUnpadReader(r, size.Padded())
if err != nil {
unlock()
return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err)
}
log.Debugf("returning reader to read unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
return &funcCloser{
Reader: bufio.NewReaderSize(upr, 127),
close: func() error {
err = r.Close()
unlock()
return err
},
}, uns, nil
}
type funcCloser struct {
io.Reader
close func() error
}
func (fc *funcCloser) Close() error { return fc.close() }

View File

@ -0,0 +1,332 @@
package sectorstorage
import (
"bytes"
"context"
"io/ioutil"
"math/rand"
"net"
"net/http"
"testing"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
specstorage "github.com/filecoin-project/specs-storage/storage"
"github.com/gorilla/mux"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
ds_sync "github.com/ipfs/go-datastore/sync"
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
// TestPieceProviderReadPiece verifies that the ReadPiece method works correctly
// only uses miner and does NOT use any remote worker.
func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) {
// Set up sector storage manager
sealerCfg := SealerConfig{
ParallelFetchLimit: 10,
AllowAddPiece: true,
AllowPreCommit1: true,
AllowPreCommit2: true,
AllowCommit: true,
AllowUnseal: true,
}
ppt := newPieceProviderTestHarness(t, sealerCfg, abi.RegisteredSealProof_StackedDrg8MiBV1)
defer ppt.shutdown(t)
// Create some padded data that aligns with the piece boundaries.
pieceData := generatePieceData(8 * 127 * 1024 * 8)
size := abi.UnpaddedPieceSize(len(pieceData))
ppt.addPiece(t, pieceData)
// read piece
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
false, pieceData)
// pre-commit 1
preCommit1 := ppt.preCommit1(t)
// read piece
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
false, pieceData)
// pre-commit 2
ppt.preCommit2(t, preCommit1)
// read piece
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
false, pieceData)
// finalize -> nil here will remove unsealed file
ppt.finalizeSector(t, nil)
// Read the piece -> will have to unseal
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
true, pieceData)
// read the piece -> will not have to unseal
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
false, pieceData)
}
func TestReadPieceRemoteWorkers(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
// miner's worker can only add pieces to an unsealed sector.
sealerCfg := SealerConfig{
ParallelFetchLimit: 10,
AllowAddPiece: true,
AllowPreCommit1: false,
AllowPreCommit2: false,
AllowCommit: false,
AllowUnseal: false,
}
// test harness for an 8M sector.
ppt := newPieceProviderTestHarness(t, sealerCfg, abi.RegisteredSealProof_StackedDrg8MiBV1)
defer ppt.shutdown(t)
// worker 2 will ONLY help with the sealing by first fetching
// the unsealed file from the miner.
ppt.addRemoteWorker(t, []sealtasks.TaskType{
sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit1,
sealtasks.TTFetch, sealtasks.TTFinalize,
})
// create a worker that can ONLY unseal and fetch
ppt.addRemoteWorker(t, []sealtasks.TaskType{
sealtasks.TTUnseal, sealtasks.TTFetch,
})
// run the test
// add one piece that aligns with the padding/piece boundaries.
pd1 := generatePieceData(8 * 127 * 4 * 1024)
pi1 := ppt.addPiece(t, pd1)
pd1size := pi1.Size.Unpadded()
pd2 := generatePieceData(8 * 127 * 4 * 1024)
pi2 := ppt.addPiece(t, pd2)
pd2size := pi2.Size.Unpadded()
// pre-commit 1
pC1 := ppt.preCommit1(t)
// Read the piece -> no need to unseal
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
false, pd1)
// pre-commit 2
ppt.preCommit2(t, pC1)
// Read the piece -> no need to unseal
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
false, pd1)
// finalize the sector so we declare to the index we have the sealed file
// so the unsealing worker can later look it up and fetch it if needed
// sending nil here will remove all unsealed files after sector is finalized.
ppt.finalizeSector(t, nil)
// Read the piece -> have to unseal since we removed the file.
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
true, pd1)
// Read the same piece again -> will NOT have to unseal.
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size, false, pd1)
// remove the unsealed file and read again -> will have to unseal.
ppt.removeAllUnsealedSectorFiles(t)
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
true, pd1)
// Read Piece 2 -> no unsealing as it got unsealed above.
ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, false, pd2)
// remove all unseal files -> Read Piece 2 -> will have to Unseal.
ppt.removeAllUnsealedSectorFiles(t)
ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, true, pd2)
}
type pieceProviderTestHarness struct {
ctx context.Context
index *stores.Index
pp PieceProvider
sector specstorage.SectorRef
mgr *Manager
ticket abi.SealRandomness
commD cid.Cid
localStores []*stores.Local
servers []*http.Server
addedPieces []abi.PieceInfo
}
func generatePieceData(size uint64) []byte {
bz := make([]byte, size)
rand.Read(bz)
return bz
}
func newPieceProviderTestHarness(t *testing.T, mgrConfig SealerConfig, sectorProofType abi.RegisteredSealProof) *pieceProviderTestHarness {
ctx := context.Background()
// listen on tcp socket to create an http server later
address := "0.0.0.0:0"
nl, err := net.Listen("tcp", address)
require.NoError(t, err)
// create index, storage, local store & remote store.
index := stores.NewIndex()
storage := newTestStorage(t)
localStore, err := stores.NewLocal(ctx, storage, index, []string{"http://" + nl.Addr().String() + "/remote"})
require.NoError(t, err)
remoteStore := stores.NewRemote(localStore, index, nil, 6000, &stores.DefaultPartialFileHandler{})
// data stores for state tracking.
dstore := ds_sync.MutexWrap(datastore.NewMapDatastore())
wsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/worker/calls")))
smsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls")))
mgr, err := New(ctx, localStore, remoteStore, storage, index, mgrConfig, wsts, smsts)
require.NoError(t, err)
// start a http server on the manager to serve sector file requests.
svc := &http.Server{
Addr: nl.Addr().String(),
Handler: mgr,
}
go func() {
_ = svc.Serve(nl)
}()
pp := NewPieceProvider(remoteStore, index, mgr)
sector := specstorage.SectorRef{
ID: abi.SectorID{
Miner: 100,
Number: 10,
},
ProofType: sectorProofType,
}
ticket := abi.SealRandomness{9, 9, 9, 9, 9, 9, 9, 9}
ppt := &pieceProviderTestHarness{
ctx: ctx,
index: index,
pp: pp,
sector: sector,
mgr: mgr,
ticket: ticket,
}
ppt.servers = append(ppt.servers, svc)
ppt.localStores = append(ppt.localStores, localStore)
return ppt
}
func (p *pieceProviderTestHarness) addRemoteWorker(t *testing.T, tasks []sealtasks.TaskType) {
// start an http Server
address := "0.0.0.0:0"
nl, err := net.Listen("tcp", address)
require.NoError(t, err)
localStore, err := stores.NewLocal(p.ctx, newTestStorage(t), p.index, []string{"http://" + nl.Addr().String() + "/remote"})
require.NoError(t, err)
fh := &stores.FetchHandler{
Local: localStore,
PfHandler: &stores.DefaultPartialFileHandler{},
}
mux := mux.NewRouter()
mux.PathPrefix("/remote").HandlerFunc(fh.ServeHTTP)
svc := &http.Server{
Addr: nl.Addr().String(),
Handler: mux,
}
go func() {
_ = svc.Serve(nl)
}()
remote := stores.NewRemote(localStore, p.index, nil, 1000,
&stores.DefaultPartialFileHandler{})
dstore := ds_sync.MutexWrap(datastore.NewMapDatastore())
csts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls")))
worker := newLocalWorker(nil, WorkerConfig{
TaskTypes: tasks,
}, remote, localStore, p.index, p.mgr, csts)
p.servers = append(p.servers, svc)
p.localStores = append(p.localStores, localStore)
// register self with manager
require.NoError(t, p.mgr.AddWorker(p.ctx, worker))
}
func (p *pieceProviderTestHarness) removeAllUnsealedSectorFiles(t *testing.T) {
for i := range p.localStores {
ls := p.localStores[i]
require.NoError(t, ls.Remove(p.ctx, p.sector.ID, storiface.FTUnsealed, false))
}
}
func (p *pieceProviderTestHarness) addPiece(t *testing.T, pieceData []byte) abi.PieceInfo {
var existing []abi.UnpaddedPieceSize
for _, pi := range p.addedPieces {
existing = append(existing, pi.Size.Unpadded())
}
size := abi.UnpaddedPieceSize(len(pieceData))
pieceInfo, err := p.mgr.AddPiece(p.ctx, p.sector, existing, size, bytes.NewReader(pieceData))
require.NoError(t, err)
p.addedPieces = append(p.addedPieces, pieceInfo)
return pieceInfo
}
func (p *pieceProviderTestHarness) preCommit1(t *testing.T) specstorage.PreCommit1Out {
preCommit1, err := p.mgr.SealPreCommit1(p.ctx, p.sector, p.ticket, p.addedPieces)
require.NoError(t, err)
return preCommit1
}
func (p *pieceProviderTestHarness) preCommit2(t *testing.T, pc1 specstorage.PreCommit1Out) {
sectorCids, err := p.mgr.SealPreCommit2(p.ctx, p.sector, pc1)
require.NoError(t, err)
commD := sectorCids.Unsealed
p.commD = commD
}
func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize,
expectedHadToUnseal bool, expectedBytes []byte) {
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD)
require.NoError(t, err)
require.NotNil(t, rd)
require.Equal(t, expectedHadToUnseal, isUnsealed)
defer func() { _ = rd.Close() }()
// Make sure the input matches the output
readData, err := ioutil.ReadAll(rd)
require.NoError(t, err)
require.Equal(t, expectedBytes, readData)
}
func (p *pieceProviderTestHarness) finalizeSector(t *testing.T, keepUnseal []specstorage.Range) {
require.NoError(t, p.mgr.FinalizeSector(p.ctx, p.sector, keepUnseal))
}
func (p *pieceProviderTestHarness) shutdown(t *testing.T) {
for _, svc := range p.servers {
s := svc
require.NoError(t, s.Shutdown(p.ctx))
}
}

View File

@ -313,7 +313,6 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
func init() {
ResourceTable[sealtasks.TTUnseal] = ResourceTable[sealtasks.TTPreCommit1] // TODO: measure accurately
ResourceTable[sealtasks.TTReadUnsealed] = ResourceTable[sealtasks.TTFetch]
// V1_1 is the same as V1
for _, m := range ResourceTable {

View File

@ -11,21 +11,19 @@ const (
TTFinalize TaskType = "seal/v0/finalize"
TTFetch TaskType = "seal/v0/fetch"
TTUnseal TaskType = "seal/v0/unseal"
TTReadUnsealed TaskType = "seal/v0/unsealread"
TTFetch TaskType = "seal/v0/fetch"
TTUnseal TaskType = "seal/v0/unseal"
)
var order = map[TaskType]int{
TTAddPiece: 6, // least priority
TTPreCommit1: 5,
TTPreCommit2: 4,
TTCommit2: 3,
TTCommit1: 2,
TTUnseal: 1,
TTFetch: -1,
TTReadUnsealed: -1,
TTFinalize: -2, // most priority
TTAddPiece: 6, // least priority
TTPreCommit1: 5,
TTPreCommit2: 4,
TTCommit2: 3,
TTCommit1: 2,
TTUnseal: 1,
TTFetch: -1,
TTFinalize: -2, // most priority
}
var shortNames = map[TaskType]string{
@ -38,9 +36,8 @@ var shortNames = map[TaskType]string{
TTFinalize: "FIN",
TTFetch: "GET",
TTUnseal: "UNS",
TTReadUnsealed: "RD",
TTFetch: "GET",
TTUnseal: "UNS",
}
func (a TaskType) MuchLess(b TaskType) (bool, bool) {

View File

@ -5,7 +5,10 @@ import (
"io"
"net/http"
"os"
"strconv"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
@ -18,8 +21,32 @@ import (
var log = logging.Logger("stores")
var _ partialFileHandler = &DefaultPartialFileHandler{}
// DefaultPartialFileHandler is the default implementation of the partialFileHandler interface.
// This is probably the only implementation we'll ever use because the purpose of the
// interface to is to mock out partial file related functionality during testing.
type DefaultPartialFileHandler struct{}
func (d *DefaultPartialFileHandler) OpenPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialfile.PartialFile, error) {
return partialfile.OpenPartialFile(maxPieceSize, path)
}
func (d *DefaultPartialFileHandler) HasAllocated(pf *partialfile.PartialFile, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
return pf.HasAllocated(offset, size)
}
func (d *DefaultPartialFileHandler) Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
return pf.Reader(offset, size)
}
// Close closes the partial file
func (d *DefaultPartialFileHandler) Close(pf *partialfile.PartialFile) error {
return pf.Close()
}
type FetchHandler struct {
*Local
Local Store
PfHandler partialFileHandler
}
func (handler *FetchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // /remote/
@ -29,6 +56,8 @@ func (handler *FetchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc("/remote/{type}/{id}", handler.remoteGetSector).Methods("GET")
mux.HandleFunc("/remote/{type}/{id}", handler.remoteDeleteSector).Methods("DELETE")
mux.HandleFunc("/remote/{type}/{id}/{spt}/allocated/{offset}/{size}", handler.remoteGetAllocated).Methods("GET")
mux.ServeHTTP(w, r)
}
@ -54,6 +83,8 @@ func (handler *FetchHandler) remoteStatFs(w http.ResponseWriter, r *http.Request
}
}
// remoteGetSector returns the sector file/tared directory byte stream for the sectorID and sector file type sent in the request.
// returns an error if it does NOT have the required sector file/dir.
func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Request) {
log.Infof("SERVE GET %s", r.URL)
vars := mux.Vars(r)
@ -73,7 +104,6 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
}
// The caller has a lock on this sector already, no need to get one here
// passing 0 spt because we don't allocate anything
si := storage.SectorRef{
ID: id,
@ -82,7 +112,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
paths, _, err := handler.Local.AcquireSector(r.Context(), si, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
log.Errorf("%+v", err)
log.Errorf("AcquireSector: %+v", err)
w.WriteHeader(500)
return
}
@ -98,37 +128,38 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
stat, err := os.Stat(path)
if err != nil {
log.Errorf("%+v", err)
log.Errorf("os.Stat: %+v", err)
w.WriteHeader(500)
return
}
var rd io.Reader
if stat.IsDir() {
rd, err = tarutil.TarDirectory(path)
if _, has := r.Header["Range"]; has {
log.Error("Range not supported on directories")
w.WriteHeader(500)
return
}
rd, err := tarutil.TarDirectory(path)
if err != nil {
log.Errorf("%+v", err)
w.WriteHeader(500)
return
}
w.Header().Set("Content-Type", "application/x-tar")
w.WriteHeader(200)
if _, err := io.CopyBuffer(w, rd, make([]byte, CopyBuf)); err != nil {
log.Errorf("%+v", err)
return
}
} else {
rd, err = os.OpenFile(path, os.O_RDONLY, 0644) // nolint
w.Header().Set("Content-Type", "application/octet-stream")
}
if err != nil {
log.Errorf("%+v", err)
w.WriteHeader(500)
return
}
if !stat.IsDir() {
defer func() {
if err := rd.(*os.File).Close(); err != nil {
log.Errorf("closing source file: %+v", err)
}
}()
// will do a ranged read over the file at the given path if the caller has asked for a ranged read in the request headers.
http.ServeFile(w, r, path)
}
w.WriteHeader(200)
if _, err := io.CopyBuffer(w, rd, make([]byte, CopyBuf)); err != nil {
log.Errorf("%+v", err)
return
}
log.Debugf("served sector file/dir, sectorID=%+v, fileType=%s, path=%s", id, ft, path)
}
func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.Request) {
@ -149,13 +180,120 @@ func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.R
return
}
if err := handler.Remove(r.Context(), id, ft, false); err != nil {
if err := handler.Local.Remove(r.Context(), id, ft, false); err != nil {
log.Errorf("%+v", err)
w.WriteHeader(500)
return
}
}
// remoteGetAllocated returns `http.StatusOK` if the worker already has an Unsealed sector file
// containing the Unsealed piece sent in the request.
// returns `http.StatusRequestedRangeNotSatisfiable` otherwise.
func (handler *FetchHandler) remoteGetAllocated(w http.ResponseWriter, r *http.Request) {
log.Infof("SERVE Alloc check %s", r.URL)
vars := mux.Vars(r)
id, err := storiface.ParseSectorID(vars["id"])
if err != nil {
log.Errorf("parsing sectorID: %+v", err)
w.WriteHeader(500)
return
}
ft, err := ftFromString(vars["type"])
if err != nil {
log.Errorf("ftFromString: %+v", err)
w.WriteHeader(500)
return
}
if ft != storiface.FTUnsealed {
log.Errorf("/allocated only supports unsealed sector files")
w.WriteHeader(500)
return
}
spti, err := strconv.ParseInt(vars["spt"], 10, 64)
if err != nil {
log.Errorf("parsing spt: %+v", err)
w.WriteHeader(500)
return
}
spt := abi.RegisteredSealProof(spti)
ssize, err := spt.SectorSize()
if err != nil {
log.Errorf("spt.SectorSize(): %+v", err)
w.WriteHeader(500)
return
}
offi, err := strconv.ParseInt(vars["offset"], 10, 64)
if err != nil {
log.Errorf("parsing offset: %+v", err)
w.WriteHeader(500)
return
}
szi, err := strconv.ParseInt(vars["size"], 10, 64)
if err != nil {
log.Errorf("parsing size: %+v", err)
w.WriteHeader(500)
return
}
// The caller has a lock on this sector already, no need to get one here
// passing 0 spt because we don't allocate anything
si := storage.SectorRef{
ID: id,
ProofType: 0,
}
// get the path of the local Unsealed file for the given sector.
// return error if we do NOT have it.
paths, _, err := handler.Local.AcquireSector(r.Context(), si, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
log.Errorf("AcquireSector: %+v", err)
w.WriteHeader(500)
return
}
path := storiface.PathByType(paths, ft)
if path == "" {
log.Error("acquired path was empty")
w.WriteHeader(500)
return
}
// open the Unsealed file and check if it has the Unsealed sector for the piece at the given offset and size.
pf, err := handler.PfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
if err != nil {
log.Error("opening partial file: ", err)
w.WriteHeader(500)
return
}
defer func() {
if err := pf.Close(); err != nil {
log.Error("closing partial file: ", err)
}
}()
has, err := handler.PfHandler.HasAllocated(pf, storiface.UnpaddedByteIndex(offi), abi.UnpaddedPieceSize(szi))
if err != nil {
log.Error("has allocated: ", err)
w.WriteHeader(500)
return
}
if has {
log.Debugf("returning ok: worker has unsealed file with unsealed piece, sector:%+v, offset:%d, size:%d", id, offi, szi)
w.WriteHeader(http.StatusOK)
return
}
log.Debugf("returning StatusRequestedRangeNotSatisfiable: worker does NOT have unsealed file with unsealed piece, sector:%+v, offset:%d, size:%d", id, offi, szi)
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
}
func ftFromString(t string) (storiface.SectorFileType, error) {
switch t {
case storiface.FTUnsealed.String():

View File

@ -0,0 +1,457 @@
package stores_test
import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/stores/mocks"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/specs-storage/storage"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
)
func TestRemoteGetAllocated(t *testing.T) {
emptyPartialFile := &partialfile.PartialFile{}
pfPath := "path"
expectedSectorRef := storage.SectorRef{
ID: abi.SectorID{
Miner: 123,
Number: 123,
},
ProofType: 0,
}
validSectorName := fmt.Sprintf("s-t0%d-%d", 123, 123)
validSectorFileType := storiface.FTUnsealed.String()
validSectorType := "1"
sectorSize := abi.SealProofInfos[1].SectorSize
validOffset := "100"
validOffsetInt := 100
validSize := "1000"
validSizeInt := 1000
type pieceInfo struct {
sectorName string
fileType string
sectorType string
// piece info
offset string
size string
}
validPieceInfo := pieceInfo{
sectorName: validSectorName,
fileType: validSectorFileType,
sectorType: validSectorType,
offset: validOffset,
size: validSize,
}
tcs := map[string]struct {
piFnc func(pi *pieceInfo)
storeFnc func(s *mocks.MockStore)
pfFunc func(s *mocks.MockpartialFileHandler)
// expectation
expectedStatusCode int
}{
"fails when sector name is invalid": {
piFnc: func(pi *pieceInfo) {
pi.sectorName = "invalid"
},
expectedStatusCode: http.StatusInternalServerError,
},
"fails when file type is invalid": {
piFnc: func(pi *pieceInfo) {
pi.fileType = "invalid"
},
expectedStatusCode: http.StatusInternalServerError,
},
"fails when sector proof type is invalid": {
piFnc: func(pi *pieceInfo) {
pi.sectorType = "invalid"
},
expectedStatusCode: http.StatusInternalServerError,
},
"fails when offset is invalid": {
piFnc: func(pi *pieceInfo) {
pi.offset = "invalid"
},
expectedStatusCode: http.StatusInternalServerError,
},
"fails when size is invalid": {
piFnc: func(pi *pieceInfo) {
pi.size = "invalid"
},
expectedStatusCode: http.StatusInternalServerError,
},
"fails when errors out during acquiring unsealed sector file": {
expectedStatusCode: http.StatusInternalServerError,
storeFnc: func(l *mocks.MockStore) {
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
Unsealed: "path",
},
storiface.SectorPaths{}, xerrors.New("some error")).Times(1)
},
},
"fails when unsealed sector file is not found locally": {
expectedStatusCode: http.StatusInternalServerError,
storeFnc: func(l *mocks.MockStore) {
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{},
storiface.SectorPaths{}, nil).Times(1)
},
},
"fails when error while opening partial file": {
expectedStatusCode: http.StatusInternalServerError,
storeFnc: func(l *mocks.MockStore) {
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
Unsealed: pfPath,
},
storiface.SectorPaths{}, nil).Times(1)
},
pfFunc: func(pf *mocks.MockpartialFileHandler) {
pf.EXPECT().OpenPartialFile(abi.PaddedPieceSize(sectorSize), pfPath).Return(&partialfile.PartialFile{},
xerrors.New("some error")).Times(1)
},
},
"fails when determining partial file allocation returns an error": {
expectedStatusCode: http.StatusInternalServerError,
storeFnc: func(l *mocks.MockStore) {
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
Unsealed: pfPath,
},
storiface.SectorPaths{}, nil).Times(1)
},
pfFunc: func(pf *mocks.MockpartialFileHandler) {
pf.EXPECT().OpenPartialFile(abi.PaddedPieceSize(sectorSize), pfPath).Return(emptyPartialFile,
nil).Times(1)
pf.EXPECT().HasAllocated(emptyPartialFile, storiface.UnpaddedByteIndex(validOffsetInt),
abi.UnpaddedPieceSize(validSizeInt)).Return(true, xerrors.New("some error")).Times(1)
},
},
"StatusRequestedRangeNotSatisfiable when piece is NOT allocated in partial file": {
expectedStatusCode: http.StatusRequestedRangeNotSatisfiable,
storeFnc: func(l *mocks.MockStore) {
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
Unsealed: pfPath,
},
storiface.SectorPaths{}, nil).Times(1)
},
pfFunc: func(pf *mocks.MockpartialFileHandler) {
pf.EXPECT().OpenPartialFile(abi.PaddedPieceSize(sectorSize), pfPath).Return(emptyPartialFile,
nil).Times(1)
pf.EXPECT().HasAllocated(emptyPartialFile, storiface.UnpaddedByteIndex(validOffsetInt),
abi.UnpaddedPieceSize(validSizeInt)).Return(false, nil).Times(1)
},
},
"OK when piece is allocated in partial file": {
expectedStatusCode: http.StatusOK,
storeFnc: func(l *mocks.MockStore) {
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
Unsealed: pfPath,
},
storiface.SectorPaths{}, nil).Times(1)
},
pfFunc: func(pf *mocks.MockpartialFileHandler) {
pf.EXPECT().OpenPartialFile(abi.PaddedPieceSize(sectorSize), pfPath).Return(emptyPartialFile,
nil).Times(1)
pf.EXPECT().HasAllocated(emptyPartialFile, storiface.UnpaddedByteIndex(validOffsetInt),
abi.UnpaddedPieceSize(validSizeInt)).Return(true, nil).Times(1)
},
},
}
for name, tc := range tcs {
tc := tc
t.Run(name, func(t *testing.T) {
// create go mock controller here
mockCtrl := gomock.NewController(t)
// when test is done, assert expectations on all mock objects.
defer mockCtrl.Finish()
lstore := mocks.NewMockStore(mockCtrl)
pfhandler := mocks.NewMockpartialFileHandler(mockCtrl)
handler := &stores.FetchHandler{
lstore,
pfhandler,
}
// run http server
ts := httptest.NewServer(handler)
defer ts.Close()
pi := validPieceInfo
if tc.piFnc != nil {
tc.piFnc(&pi)
}
if tc.storeFnc != nil {
tc.storeFnc(lstore)
}
if tc.pfFunc != nil {
tc.pfFunc(pfhandler)
}
// call remoteGetAllocated
url := fmt.Sprintf("%s/remote/%s/%s/%s/allocated/%s/%s",
ts.URL,
pi.fileType,
pi.sectorName,
pi.sectorType,
pi.offset,
pi.size)
resp, err := http.Get(url)
require.NoError(t, err)
defer func() {
_ = resp.Body.Close()
}()
// assert expected status code
require.Equal(t, tc.expectedStatusCode, resp.StatusCode)
})
}
}
func TestRemoteGetSector(t *testing.T) {
str := "hello-world"
fileBytes := []byte(str)
validSectorName := fmt.Sprintf("s-t0%d-%d", 123, 123)
validSectorFileType := storiface.FTUnsealed.String()
expectedSectorRef := storage.SectorRef{
ID: abi.SectorID{
Miner: 123,
Number: 123,
},
ProofType: 0,
}
type sectorInfo struct {
sectorName string
fileType string
}
validSectorInfo := sectorInfo{
sectorName: validSectorName,
fileType: validSectorFileType,
}
tcs := map[string]struct {
siFnc func(pi *sectorInfo)
storeFnc func(s *mocks.MockStore, path string)
// reading a file or a dir
isDir bool
// expectation
noResponseBytes bool
expectedContentType string
expectedStatusCode int
expectedResponseBytes []byte
}{
"fails when sector name is invalid": {
siFnc: func(si *sectorInfo) {
si.sectorName = "invalid"
},
expectedStatusCode: http.StatusInternalServerError,
noResponseBytes: true,
},
"fails when file type is invalid": {
siFnc: func(si *sectorInfo) {
si.fileType = "invalid"
},
expectedStatusCode: http.StatusInternalServerError,
noResponseBytes: true,
},
"fails when error while acquiring sector file": {
storeFnc: func(l *mocks.MockStore, _ string) {
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
Unsealed: "path",
},
storiface.SectorPaths{}, xerrors.New("some error")).Times(1)
},
expectedStatusCode: http.StatusInternalServerError,
noResponseBytes: true,
},
"fails when acquired sector file path is empty": {
expectedStatusCode: http.StatusInternalServerError,
storeFnc: func(l *mocks.MockStore, _ string) {
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{},
storiface.SectorPaths{}, nil).Times(1)
},
noResponseBytes: true,
},
"fails when acquired file does not exist": {
expectedStatusCode: http.StatusInternalServerError,
storeFnc: func(l *mocks.MockStore, _ string) {
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
Unsealed: "path",
},
storiface.SectorPaths{}, nil)
},
noResponseBytes: true,
},
"successfully read a sector file": {
storeFnc: func(l *mocks.MockStore, path string) {
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
Unsealed: path,
},
storiface.SectorPaths{}, nil)
},
noResponseBytes: false,
expectedContentType: "application/octet-stream",
expectedStatusCode: 200,
expectedResponseBytes: fileBytes,
},
"successfully read a sector dir": {
storeFnc: func(l *mocks.MockStore, path string) {
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
Unsealed: path,
},
storiface.SectorPaths{}, nil)
},
isDir: true,
noResponseBytes: false,
expectedContentType: "application/x-tar",
expectedStatusCode: 200,
expectedResponseBytes: fileBytes,
},
}
for name, tc := range tcs {
tc := tc
t.Run(name, func(t *testing.T) {
mockCtrl := gomock.NewController(t)
// when test is done, assert expectations on all mock objects.
defer mockCtrl.Finish()
lstore := mocks.NewMockStore(mockCtrl)
pfhandler := mocks.NewMockpartialFileHandler(mockCtrl)
var path string
if !tc.isDir {
// create file
tempFile, err := ioutil.TempFile("", "TestRemoteGetSector-")
require.NoError(t, err)
defer func() {
_ = os.Remove(tempFile.Name())
}()
_, err = tempFile.Write(fileBytes)
require.NoError(t, err)
path = tempFile.Name()
} else {
// create dir with a file
tempFile2, err := ioutil.TempFile("", "TestRemoteGetSector-")
require.NoError(t, err)
defer func() {
_ = os.Remove(tempFile2.Name())
}()
stat, err := os.Stat(tempFile2.Name())
require.NoError(t, err)
tempDir, err := ioutil.TempDir("", "TestRemoteGetSector-")
require.NoError(t, err)
defer func() {
_ = os.RemoveAll(tempDir)
}()
require.NoError(t, os.Rename(tempFile2.Name(), filepath.Join(tempDir, stat.Name())))
path = tempDir
}
handler := &stores.FetchHandler{
lstore,
pfhandler,
}
// run http server
ts := httptest.NewServer(handler)
defer ts.Close()
si := validSectorInfo
if tc.siFnc != nil {
tc.siFnc(&si)
}
if tc.storeFnc != nil {
tc.storeFnc(lstore, path)
}
// call remoteGetAllocated
url := fmt.Sprintf("%s/remote/%s/%s",
ts.URL,
si.fileType,
si.sectorName,
)
resp, err := http.Get(url)
require.NoError(t, err)
defer func() {
_ = resp.Body.Close()
}()
bz, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
// assert expected status code
require.Equal(t, tc.expectedStatusCode, resp.StatusCode)
if !tc.noResponseBytes {
if !tc.isDir {
require.EqualValues(t, tc.expectedResponseBytes, bz)
}
}
require.Equal(t, tc.expectedContentType, resp.Header.Get("Content-Type"))
})
}
}

View File

@ -2,8 +2,10 @@ package stores
import (
"context"
"os"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/filecoin-project/specs-storage/storage"
@ -11,6 +13,23 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
// PartialFileHandler helps mock out the partial file functionality during testing.
type partialFileHandler interface {
// OpenPartialFile opens and returns a partial file at the given path and also verifies it has the given
// size
OpenPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialfile.PartialFile, error)
// HasAllocated returns true if the given partial file has an unsealed piece starting at the given offset with the given size.
// returns false otherwise.
HasAllocated(pf *partialfile.PartialFile, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
// Reader returns a file from which we can read the unsealed piece in the partial file.
Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error)
// Close closes the partial file
Close(pf *partialfile.PartialFile) error
}
type Store interface {
AcquireSector(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType, op storiface.AcquireMode) (paths storiface.SectorPaths, stores storiface.SectorPaths, err error)
Remove(ctx context.Context, s abi.SectorID, types storiface.SectorFileType, force bool) error
@ -23,4 +42,6 @@ type Store interface {
MoveStorage(ctx context.Context, s storage.SectorRef, types storiface.SectorFileType) error
FsStat(ctx context.Context, id ID) (fsutil.FsStat, error)
Reserve(ctx context.Context, sid storage.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error)
}

View File

@ -0,0 +1,169 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: index.go
// Package mock_stores is a generated GoMock package.
package mocks
import (
context "context"
reflect "reflect"
abi "github.com/filecoin-project/go-state-types/abi"
fsutil "github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
stores "github.com/filecoin-project/lotus/extern/sector-storage/stores"
storiface "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
gomock "github.com/golang/mock/gomock"
)
// MockSectorIndex is a mock of SectorIndex interface.
type MockSectorIndex struct {
ctrl *gomock.Controller
recorder *MockSectorIndexMockRecorder
}
// MockSectorIndexMockRecorder is the mock recorder for MockSectorIndex.
type MockSectorIndexMockRecorder struct {
mock *MockSectorIndex
}
// NewMockSectorIndex creates a new mock instance.
func NewMockSectorIndex(ctrl *gomock.Controller) *MockSectorIndex {
mock := &MockSectorIndex{ctrl: ctrl}
mock.recorder = &MockSectorIndexMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockSectorIndex) EXPECT() *MockSectorIndexMockRecorder {
return m.recorder
}
// StorageAttach mocks base method.
func (m *MockSectorIndex) StorageAttach(arg0 context.Context, arg1 stores.StorageInfo, arg2 fsutil.FsStat) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StorageAttach", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// StorageAttach indicates an expected call of StorageAttach.
func (mr *MockSectorIndexMockRecorder) StorageAttach(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageAttach", reflect.TypeOf((*MockSectorIndex)(nil).StorageAttach), arg0, arg1, arg2)
}
// StorageBestAlloc mocks base method.
func (m *MockSectorIndex) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]stores.StorageInfo, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StorageBestAlloc", ctx, allocate, ssize, pathType)
ret0, _ := ret[0].([]stores.StorageInfo)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StorageBestAlloc indicates an expected call of StorageBestAlloc.
func (mr *MockSectorIndexMockRecorder) StorageBestAlloc(ctx, allocate, ssize, pathType interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageBestAlloc", reflect.TypeOf((*MockSectorIndex)(nil).StorageBestAlloc), ctx, allocate, ssize, pathType)
}
// StorageDeclareSector mocks base method.
func (m *MockSectorIndex) StorageDeclareSector(ctx context.Context, storageID stores.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StorageDeclareSector", ctx, storageID, s, ft, primary)
ret0, _ := ret[0].(error)
return ret0
}
// StorageDeclareSector indicates an expected call of StorageDeclareSector.
func (mr *MockSectorIndexMockRecorder) StorageDeclareSector(ctx, storageID, s, ft, primary interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageDeclareSector", reflect.TypeOf((*MockSectorIndex)(nil).StorageDeclareSector), ctx, storageID, s, ft, primary)
}
// StorageDropSector mocks base method.
func (m *MockSectorIndex) StorageDropSector(ctx context.Context, storageID stores.ID, s abi.SectorID, ft storiface.SectorFileType) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StorageDropSector", ctx, storageID, s, ft)
ret0, _ := ret[0].(error)
return ret0
}
// StorageDropSector indicates an expected call of StorageDropSector.
func (mr *MockSectorIndexMockRecorder) StorageDropSector(ctx, storageID, s, ft interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageDropSector", reflect.TypeOf((*MockSectorIndex)(nil).StorageDropSector), ctx, storageID, s, ft)
}
// StorageFindSector mocks base method.
func (m *MockSectorIndex) StorageFindSector(ctx context.Context, sector abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]stores.SectorStorageInfo, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StorageFindSector", ctx, sector, ft, ssize, allowFetch)
ret0, _ := ret[0].([]stores.SectorStorageInfo)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StorageFindSector indicates an expected call of StorageFindSector.
func (mr *MockSectorIndexMockRecorder) StorageFindSector(ctx, sector, ft, ssize, allowFetch interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageFindSector", reflect.TypeOf((*MockSectorIndex)(nil).StorageFindSector), ctx, sector, ft, ssize, allowFetch)
}
// StorageInfo mocks base method.
func (m *MockSectorIndex) StorageInfo(arg0 context.Context, arg1 stores.ID) (stores.StorageInfo, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StorageInfo", arg0, arg1)
ret0, _ := ret[0].(stores.StorageInfo)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StorageInfo indicates an expected call of StorageInfo.
func (mr *MockSectorIndexMockRecorder) StorageInfo(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageInfo", reflect.TypeOf((*MockSectorIndex)(nil).StorageInfo), arg0, arg1)
}
// StorageLock mocks base method.
func (m *MockSectorIndex) StorageLock(ctx context.Context, sector abi.SectorID, read, write storiface.SectorFileType) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StorageLock", ctx, sector, read, write)
ret0, _ := ret[0].(error)
return ret0
}
// StorageLock indicates an expected call of StorageLock.
func (mr *MockSectorIndexMockRecorder) StorageLock(ctx, sector, read, write interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageLock", reflect.TypeOf((*MockSectorIndex)(nil).StorageLock), ctx, sector, read, write)
}
// StorageReportHealth mocks base method.
func (m *MockSectorIndex) StorageReportHealth(arg0 context.Context, arg1 stores.ID, arg2 stores.HealthReport) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StorageReportHealth", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// StorageReportHealth indicates an expected call of StorageReportHealth.
func (mr *MockSectorIndexMockRecorder) StorageReportHealth(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageReportHealth", reflect.TypeOf((*MockSectorIndex)(nil).StorageReportHealth), arg0, arg1, arg2)
}
// StorageTryLock mocks base method.
func (m *MockSectorIndex) StorageTryLock(ctx context.Context, sector abi.SectorID, read, write storiface.SectorFileType) (bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StorageTryLock", ctx, sector, read, write)
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StorageTryLock indicates an expected call of StorageTryLock.
func (mr *MockSectorIndexMockRecorder) StorageTryLock(ctx, sector, read, write interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageTryLock", reflect.TypeOf((*MockSectorIndex)(nil).StorageTryLock), ctx, sector, read, write)
}

View File

@ -0,0 +1,212 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: interface.go
// Package mock_stores is a generated GoMock package.
package mocks
import (
context "context"
os "os"
reflect "reflect"
abi "github.com/filecoin-project/go-state-types/abi"
fsutil "github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
partialfile "github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
stores "github.com/filecoin-project/lotus/extern/sector-storage/stores"
storiface "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
storage "github.com/filecoin-project/specs-storage/storage"
gomock "github.com/golang/mock/gomock"
)
// MockpartialFileHandler is a mock of partialFileHandler interface.
type MockpartialFileHandler struct {
ctrl *gomock.Controller
recorder *MockpartialFileHandlerMockRecorder
}
// MockpartialFileHandlerMockRecorder is the mock recorder for MockpartialFileHandler.
type MockpartialFileHandlerMockRecorder struct {
mock *MockpartialFileHandler
}
// NewMockpartialFileHandler creates a new mock instance.
func NewMockpartialFileHandler(ctrl *gomock.Controller) *MockpartialFileHandler {
mock := &MockpartialFileHandler{ctrl: ctrl}
mock.recorder = &MockpartialFileHandlerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockpartialFileHandler) EXPECT() *MockpartialFileHandlerMockRecorder {
return m.recorder
}
// Close mocks base method.
func (m *MockpartialFileHandler) Close(pf *partialfile.PartialFile) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close", pf)
ret0, _ := ret[0].(error)
return ret0
}
// Close indicates an expected call of Close.
func (mr *MockpartialFileHandlerMockRecorder) Close(pf interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockpartialFileHandler)(nil).Close), pf)
}
// HasAllocated mocks base method.
func (m *MockpartialFileHandler) HasAllocated(pf *partialfile.PartialFile, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "HasAllocated", pf, offset, size)
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// HasAllocated indicates an expected call of HasAllocated.
func (mr *MockpartialFileHandlerMockRecorder) HasAllocated(pf, offset, size interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasAllocated", reflect.TypeOf((*MockpartialFileHandler)(nil).HasAllocated), pf, offset, size)
}
// OpenPartialFile mocks base method.
func (m *MockpartialFileHandler) OpenPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialfile.PartialFile, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "OpenPartialFile", maxPieceSize, path)
ret0, _ := ret[0].(*partialfile.PartialFile)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// OpenPartialFile indicates an expected call of OpenPartialFile.
func (mr *MockpartialFileHandlerMockRecorder) OpenPartialFile(maxPieceSize, path interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenPartialFile", reflect.TypeOf((*MockpartialFileHandler)(nil).OpenPartialFile), maxPieceSize, path)
}
// Reader mocks base method.
func (m *MockpartialFileHandler) Reader(pf *partialfile.PartialFile, offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Reader", pf, offset, size)
ret0, _ := ret[0].(*os.File)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Reader indicates an expected call of Reader.
func (mr *MockpartialFileHandlerMockRecorder) Reader(pf, offset, size interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reader", reflect.TypeOf((*MockpartialFileHandler)(nil).Reader), pf, offset, size)
}
// MockStore is a mock of Store interface.
type MockStore struct {
ctrl *gomock.Controller
recorder *MockStoreMockRecorder
}
// MockStoreMockRecorder is the mock recorder for MockStore.
type MockStoreMockRecorder struct {
mock *MockStore
}
// NewMockStore creates a new mock instance.
func NewMockStore(ctrl *gomock.Controller) *MockStore {
mock := &MockStore{ctrl: ctrl}
mock.recorder = &MockStoreMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockStore) EXPECT() *MockStoreMockRecorder {
return m.recorder
}
// AcquireSector mocks base method.
func (m *MockStore) AcquireSector(ctx context.Context, s storage.SectorRef, existing, allocate storiface.SectorFileType, sealing storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AcquireSector", ctx, s, existing, allocate, sealing, op)
ret0, _ := ret[0].(storiface.SectorPaths)
ret1, _ := ret[1].(storiface.SectorPaths)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// AcquireSector indicates an expected call of AcquireSector.
func (mr *MockStoreMockRecorder) AcquireSector(ctx, s, existing, allocate, sealing, op interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AcquireSector", reflect.TypeOf((*MockStore)(nil).AcquireSector), ctx, s, existing, allocate, sealing, op)
}
// FsStat mocks base method.
func (m *MockStore) FsStat(ctx context.Context, id stores.ID) (fsutil.FsStat, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FsStat", ctx, id)
ret0, _ := ret[0].(fsutil.FsStat)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FsStat indicates an expected call of FsStat.
func (mr *MockStoreMockRecorder) FsStat(ctx, id interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FsStat", reflect.TypeOf((*MockStore)(nil).FsStat), ctx, id)
}
// MoveStorage mocks base method.
func (m *MockStore) MoveStorage(ctx context.Context, s storage.SectorRef, types storiface.SectorFileType) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "MoveStorage", ctx, s, types)
ret0, _ := ret[0].(error)
return ret0
}
// MoveStorage indicates an expected call of MoveStorage.
func (mr *MockStoreMockRecorder) MoveStorage(ctx, s, types interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveStorage", reflect.TypeOf((*MockStore)(nil).MoveStorage), ctx, s, types)
}
// Remove mocks base method.
func (m *MockStore) Remove(ctx context.Context, s abi.SectorID, types storiface.SectorFileType, force bool) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Remove", ctx, s, types, force)
ret0, _ := ret[0].(error)
return ret0
}
// Remove indicates an expected call of Remove.
func (mr *MockStoreMockRecorder) Remove(ctx, s, types, force interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Remove", reflect.TypeOf((*MockStore)(nil).Remove), ctx, s, types, force)
}
// RemoveCopies mocks base method.
func (m *MockStore) RemoveCopies(ctx context.Context, s abi.SectorID, types storiface.SectorFileType) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RemoveCopies", ctx, s, types)
ret0, _ := ret[0].(error)
return ret0
}
// RemoveCopies indicates an expected call of RemoveCopies.
func (mr *MockStoreMockRecorder) RemoveCopies(ctx, s, types interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveCopies", reflect.TypeOf((*MockStore)(nil).RemoveCopies), ctx, s, types)
}
// Reserve mocks base method.
func (m *MockStore) Reserve(ctx context.Context, sid storage.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Reserve", ctx, sid, ft, storageIDs, overheadTab)
ret0, _ := ret[0].(func())
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Reserve indicates an expected call of Reserve.
func (mr *MockStoreMockRecorder) Reserve(ctx, sid, ft, storageIDs, overheadTab interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reserve", reflect.TypeOf((*MockStore)(nil).Reserve), ctx, sid, ft, storageIDs, overheadTab)
}

View File

@ -3,6 +3,7 @@ package stores
import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math/bits"
@ -31,7 +32,7 @@ var FetchTempSubdir = "fetching"
var CopyBuf = 1 << 20
type Remote struct {
local *Local
local Store
index SectorIndex
auth http.Header
@ -39,6 +40,8 @@ type Remote struct {
fetchLk sync.Mutex
fetching map[abi.SectorID]chan struct{}
pfHandler partialFileHandler
}
func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types storiface.SectorFileType) error {
@ -49,7 +52,7 @@ func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types storifa
return r.local.RemoveCopies(ctx, s, types)
}
func NewRemote(local *Local, index SectorIndex, auth http.Header, fetchLimit int) *Remote {
func NewRemote(local Store, index SectorIndex, auth http.Header, fetchLimit int, pfHandler partialFileHandler) *Remote {
return &Remote{
local: local,
index: index,
@ -57,7 +60,8 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header, fetchLimit int
limit: make(chan struct{}, fetchLimit),
fetching: map[abi.SectorID]chan struct{}{},
fetching: map[abi.SectorID]chan struct{}{},
pfHandler: pfHandler,
}
}
@ -415,4 +419,185 @@ func (r *Remote) FsStat(ctx context.Context, id ID) (fsutil.FsStat, error) {
return out, nil
}
func (r *Remote) checkAllocated(ctx context.Context, url string, spt abi.RegisteredSealProof, offset, size abi.PaddedPieceSize) (bool, error) {
url = fmt.Sprintf("%s/%d/allocated/%d/%d", url, spt, offset.Unpadded(), size.Unpadded())
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return false, xerrors.Errorf("request: %w", err)
}
req.Header = r.auth.Clone()
req = req.WithContext(ctx)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false, xerrors.Errorf("do request: %w", err)
}
defer resp.Body.Close() // nolint
switch resp.StatusCode {
case http.StatusOK:
return true, nil
case http.StatusRequestedRangeNotSatisfiable:
return false, nil
default:
return false, xerrors.Errorf("unexpected http response: %d", resp.StatusCode)
}
}
func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) {
if len(r.limit) >= cap(r.limit) {
log.Infof("Throttling remote read, %d already running", len(r.limit))
}
// TODO: Smarter throttling
// * Priority (just going sequentially is still pretty good)
// * Per interface
// * Aware of remote load
select {
case r.limit <- struct{}{}:
defer func() { <-r.limit }()
case <-ctx.Done():
return nil, xerrors.Errorf("context error while waiting for fetch limiter: %w", ctx.Err())
}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, xerrors.Errorf("request: %w", err)
}
if r.auth != nil {
req.Header = r.auth.Clone()
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+size-1))
req = req.WithContext(ctx)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, xerrors.Errorf("do request: %w", err)
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
resp.Body.Close() // nolint
return nil, xerrors.Errorf("non-200 code: %d", resp.StatusCode)
}
return resp.Body, nil
}
// Reader returns a reader for an unsealed piece at the given offset in the given sector.
// If the Miner has the unsealed piece locally, it will return a reader that reads from the local copy.
// If the Miner does NOT have the unsealed piece locally, it will query all workers that have the unsealed sector file
// to know if they have the unsealed piece and will then read the unsealed piece data from a worker that has it.
//
// Returns a nil reader if :
// 1. no worker(local worker included) has an unsealed file for the given sector OR
// 2. no worker(local worker included) has the unsealed piece in their unsealed sector file.
// Will return a nil reader and a nil error in such a case.
func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) {
ft := storiface.FTUnsealed
// check if we have the unsealed sector file locally
paths, _, err := r.local.AcquireSector(ctx, s, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
return nil, xerrors.Errorf("acquire local: %w", err)
}
path := storiface.PathByType(paths, ft)
if path != "" {
// if we have the unsealed file locally, return a reader that can be used to read the contents of the
// unsealed piece.
log.Infof("Read local %s (+%d,%d)", path, offset, size)
ssize, err := s.ProofType.SectorSize()
if err != nil {
return nil, err
}
log.Debugf("fetched sector size %s (+%d,%d)", path, offset, size)
// open the unsealed sector file for the given sector size located at the given path.
pf, err := r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
if err != nil {
return nil, xerrors.Errorf("opening partial file: %w", err)
}
log.Debugf("local partial file opened %s (+%d,%d)", path, offset, size)
// even though we have an unsealed file for the given sector, we still need to determine if we have the unsealed piece
// in the unsealed sector file. That is what `HasAllocated` checks for.
has, err := r.pfHandler.HasAllocated(pf, storiface.UnpaddedByteIndex(offset.Unpadded()), size.Unpadded())
if err != nil {
return nil, xerrors.Errorf("has allocated: %w", err)
}
log.Debugf("check if partial file is allocated %s (+%d,%d)", path, offset, size)
if !has {
log.Debugf("miner has unsealed file but not unseal piece, %s (+%d,%d)", path, offset, size)
if err := r.pfHandler.Close(pf); err != nil {
return nil, xerrors.Errorf("close partial file: %w", err)
}
return nil, nil
}
log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size)
return r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset), size)
}
// --- We don't have the unsealed sector file locally
// if we don't have the unsealed sector file locally, we'll first lookup the Miner Sector Store Index
// to determine which workers have the unsealed file and then query those workers to know
// if they have the unsealed piece in the unsealed sector file.
si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false)
if err != nil {
log.Debugf("Reader, did not find unsealed file on any of the workers %s (+%d,%d)", path, offset, size)
return nil, err
}
if len(si) == 0 {
return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound)
}
sort.Slice(si, func(i, j int) bool {
return si[i].Weight > si[j].Weight
})
var lastErr error
for _, info := range si {
for _, url := range info.URLs {
// checkAllocated makes a JSON RPC query to a remote worker to determine if it has
// unsealed piece in their unsealed sector file.
ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size)
if err != nil {
log.Warnw("check if remote has piece", "url", url, "error", err)
lastErr = err
continue
}
if !ok {
continue
}
// readRemote fetches a reader that we can use to read the unsealed piece from the remote worker.
// It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file.
rd, err := r.readRemote(ctx, url, offset, size)
if err != nil {
log.Warnw("reading from remote", "url", url, "error", err)
lastErr = err
continue
}
log.Infof("Read remote %s (+%d,%d)", url, offset, size)
return rd, nil
}
}
// we couldn't find a unsealed file with the unsealed piece, will return a nil reader.
log.Debugf("returning nil reader, did not find unsealed piece for %+v (+%d,%d), last error=%s", s, offset, size, lastErr)
return nil, nil
}
func (r *Remote) Reserve(ctx context.Context, sid storage.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) {
log.Warnf("reserve called on remote store, sectorID: %v", sid.ID)
return func() {
}, nil
}
var _ Store = &Remote{}

View File

@ -0,0 +1,418 @@
package stores_test
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"testing"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/stores/mocks"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/specs-storage/storage"
"github.com/golang/mock/gomock"
"github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
)
func TestReader(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
bz := []byte("Hello World")
pfPath := "path"
ft := storiface.FTUnsealed
emptyPartialFile := &partialfile.PartialFile{}
sectorRef := storage.SectorRef{
ID: abi.SectorID{
Miner: 123,
Number: 123,
},
ProofType: 1,
}
sectorSize := abi.SealProofInfos[1].SectorSize
offset := abi.PaddedPieceSize(100)
size := abi.PaddedPieceSize(1000)
ctx := context.Background()
tcs := map[string]struct {
storeFnc func(s *mocks.MockStore)
pfFunc func(s *mocks.MockpartialFileHandler)
indexFnc func(s *mocks.MockSectorIndex, serverURL string)
needHttpServer bool
getAllocatedReturnCode int
getSectorReturnCode int
serverUrl string
// expectation
errStr string
expectedNonNilReader bool
expectedSectorBytes []byte
}{
// -------- have the unsealed file locally
"fails when error while acquiring unsealed file": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, pfPath, xerrors.New("acquire error"))
},
errStr: "acquire error",
},
"fails when error while opening local partial (unsealed) file": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, pfPath, nil)
},
pfFunc: func(pf *mocks.MockpartialFileHandler) {
mockPartialFileOpen(pf, sectorSize, pfPath, xerrors.New("pf open error"))
},
errStr: "pf open error",
},
"fails when error while checking if local unsealed file has piece": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, pfPath, nil)
},
pfFunc: func(pf *mocks.MockpartialFileHandler) {
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
mockCheckAllocation(pf, offset, size, emptyPartialFile,
true, xerrors.New("piece check error"))
},
errStr: "piece check error",
},
"fails when error while closing local unsealed file that does not have the piece": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, pfPath, nil)
},
pfFunc: func(pf *mocks.MockpartialFileHandler) {
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
mockCheckAllocation(pf, offset, size, emptyPartialFile,
false, nil)
pf.EXPECT().Close(emptyPartialFile).Return(xerrors.New("close error")).Times(1)
},
errStr: "close error",
},
"fails when error while fetching reader for the local unsealed file that has the unsealed piece": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, pfPath, nil)
},
pfFunc: func(pf *mocks.MockpartialFileHandler) {
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
mockCheckAllocation(pf, offset, size, emptyPartialFile,
true, nil)
mockPfReader(pf, emptyPartialFile, offset, size, nil, xerrors.New("reader error"))
},
errStr: "reader error",
},
// ------------------- don't have the unsealed file locally
"fails when error while finding sector": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, "", nil)
},
indexFnc: func(in *mocks.MockSectorIndex, _ string) {
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
false).Return(nil, xerrors.New("find sector error"))
},
errStr: "find sector error",
},
"fails when no worker has unsealed file": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, "", nil)
},
indexFnc: func(in *mocks.MockSectorIndex, _ string) {
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
false).Return(nil, nil)
},
errStr: storiface.ErrSectorNotFound.Error(),
},
// --- nil reader when local unsealed file does NOT have unsealed piece
"nil reader when local unsealed file does not have the piece": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, pfPath, nil)
},
pfFunc: func(pf *mocks.MockpartialFileHandler) {
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
mockCheckAllocation(pf, offset, size, emptyPartialFile,
false, nil)
pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1)
},
},
// ---- nil reader when none of the remote unsealed file has unsealed piece
"nil reader when none of the worker has the unsealed piece": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, "", nil)
},
indexFnc: func(in *mocks.MockSectorIndex, url string) {
si := stores.SectorStorageInfo{
URLs: []string{url},
}
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
false).Return([]stores.SectorStorageInfo{si}, nil).Times(1)
},
needHttpServer: true,
getAllocatedReturnCode: 500,
},
"nil reader when none of the worker is able to serve the unsealed piece even though they have it": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, "", nil)
},
indexFnc: func(in *mocks.MockSectorIndex, url string) {
si := stores.SectorStorageInfo{
URLs: []string{url},
}
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
false).Return([]stores.SectorStorageInfo{si}, nil).Times(1)
},
needHttpServer: true,
getSectorReturnCode: 500,
getAllocatedReturnCode: 200,
},
// ---- Success for local unsealed file
"successfully fetches reader for piece from local unsealed file": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, pfPath, nil)
},
pfFunc: func(pf *mocks.MockpartialFileHandler) {
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
mockCheckAllocation(pf, offset, size, emptyPartialFile,
true, nil)
f, err := ioutil.TempFile("", "TestReader-")
require.NoError(t, err)
_, err = f.Write(bz)
require.NoError(t, err)
require.NoError(t, f.Close())
f, err = os.Open(f.Name())
require.NoError(t, err)
mockPfReader(pf, emptyPartialFile, offset, size, f, nil)
},
expectedNonNilReader: true,
expectedSectorBytes: bz,
},
// --- Success for remote unsealed file
"successfully fetches reader for piece from remote unsealed piece": {
storeFnc: func(l *mocks.MockStore) {
mockSectorAcquire(l, sectorRef, "", nil)
},
indexFnc: func(in *mocks.MockSectorIndex, url string) {
si := stores.SectorStorageInfo{
URLs: []string{url},
}
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
false).Return([]stores.SectorStorageInfo{si}, nil).Times(1)
},
needHttpServer: true,
getSectorReturnCode: 200,
getAllocatedReturnCode: 200,
expectedSectorBytes: bz,
expectedNonNilReader: true,
},
}
for name, tc := range tcs {
tc := tc
t.Run(name, func(t *testing.T) {
// create go mock controller here
mockCtrl := gomock.NewController(t)
// when test is done, assert expectations on all mock objects.
defer mockCtrl.Finish()
// create them mocks
lstore := mocks.NewMockStore(mockCtrl)
pfhandler := mocks.NewMockpartialFileHandler(mockCtrl)
index := mocks.NewMockSectorIndex(mockCtrl)
if tc.storeFnc != nil {
tc.storeFnc(lstore)
}
if tc.pfFunc != nil {
tc.pfFunc(pfhandler)
}
if tc.needHttpServer {
// run http server
ts := httptest.NewServer(&mockHttpServer{
expectedSectorName: storiface.SectorName(sectorRef.ID),
expectedFileType: ft.String(),
expectedOffset: fmt.Sprintf("%d", offset.Unpadded()),
expectedSize: fmt.Sprintf("%d", size.Unpadded()),
expectedSectorType: fmt.Sprintf("%d", sectorRef.ProofType),
getAllocatedReturnCode: tc.getAllocatedReturnCode,
getSectorReturnCode: tc.getSectorReturnCode,
getSectorBytes: tc.expectedSectorBytes,
})
defer ts.Close()
tc.serverUrl = fmt.Sprintf("%s/remote/%s/%s", ts.URL, ft.String(), storiface.SectorName(sectorRef.ID))
}
if tc.indexFnc != nil {
tc.indexFnc(index, tc.serverUrl)
}
remoteStore := stores.NewRemote(lstore, index, nil, 6000, pfhandler)
rd, err := remoteStore.Reader(ctx, sectorRef, offset, size)
if tc.errStr != "" {
require.Error(t, err)
require.Nil(t, rd)
require.Contains(t, err.Error(), tc.errStr)
} else {
require.NoError(t, err)
}
if !tc.expectedNonNilReader {
require.Nil(t, rd)
} else {
require.NotNil(t, rd)
defer func() {
require.NoError(t, rd.Close())
}()
if f, ok := rd.(*os.File); ok {
require.NoError(t, os.Remove(f.Name()))
}
bz, err := ioutil.ReadAll(rd)
require.NoError(t, err)
require.Equal(t, tc.expectedSectorBytes, bz)
}
})
}
}
func mockSectorAcquire(l *mocks.MockStore, sectorRef storage.SectorRef, pfPath string, err error) {
l.EXPECT().AcquireSector(gomock.Any(), sectorRef, storiface.FTUnsealed,
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
Unsealed: pfPath,
},
storiface.SectorPaths{}, err).Times(1)
}
func mockPartialFileOpen(pf *mocks.MockpartialFileHandler, sectorSize abi.SectorSize, pfPath string, err error) {
pf.EXPECT().OpenPartialFile(abi.PaddedPieceSize(sectorSize), pfPath).Return(&partialfile.PartialFile{},
err).Times(1)
}
func mockCheckAllocation(pf *mocks.MockpartialFileHandler, offset, size abi.PaddedPieceSize, file *partialfile.PartialFile,
out bool, err error) {
pf.EXPECT().HasAllocated(file, storiface.UnpaddedByteIndex(offset.Unpadded()),
size.Unpadded()).Return(out, err).Times(1)
}
func mockPfReader(pf *mocks.MockpartialFileHandler, file *partialfile.PartialFile, offset, size abi.PaddedPieceSize,
outFile *os.File, err error) {
pf.EXPECT().Reader(file, storiface.PaddedByteIndex(offset), size).Return(outFile, err)
}
type mockHttpServer struct {
expectedSectorName string
expectedFileType string
expectedOffset string
expectedSize string
expectedSectorType string
getAllocatedReturnCode int
getSectorReturnCode int
getSectorBytes []byte
}
func (m *mockHttpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
mux := mux.NewRouter()
mux.HandleFunc("/remote/{type}/{id}", m.getSector).Methods("GET")
mux.HandleFunc("/remote/{type}/{id}/{spt}/allocated/{offset}/{size}", m.getAllocated).Methods("GET")
mux.ServeHTTP(w, r)
}
func (m *mockHttpServer) getAllocated(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
if vars["id"] != m.expectedSectorName {
w.WriteHeader(http.StatusBadRequest)
return
}
if vars["type"] != m.expectedFileType {
w.WriteHeader(http.StatusBadRequest)
return
}
if vars["spt"] != m.expectedSectorType {
w.WriteHeader(http.StatusBadRequest)
return
}
if vars["offset"] != m.expectedOffset {
w.WriteHeader(http.StatusBadRequest)
return
}
if vars["size"] != m.expectedSize {
w.WriteHeader(http.StatusBadRequest)
return
}
w.WriteHeader(m.getAllocatedReturnCode)
}
func (m *mockHttpServer) getSector(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
if vars["id"] != m.expectedSectorName {
w.WriteHeader(http.StatusBadRequest)
return
}
if vars["type"] != m.expectedFileType {
w.WriteHeader(http.StatusBadRequest)
return
}
w.WriteHeader(m.getSectorReturnCode)
_, _ = w.Write(m.getSectorBytes)
}

View File

@ -2,8 +2,10 @@ package stores
import (
"bytes"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"github.com/mitchellh/go-homedir"
@ -33,7 +35,18 @@ func move(from, to string) error {
// can do better
var errOut bytes.Buffer
cmd := exec.Command("/usr/bin/env", "mv", "-t", toDir, from) // nolint
var cmd *exec.Cmd
if runtime.GOOS == "darwin" {
if err := os.MkdirAll(toDir, 0777); err != nil {
return xerrors.Errorf("failed exec MkdirAll: %s", err)
}
cmd = exec.Command("/usr/bin/env", "mv", from, toDir) // nolint
} else {
cmd = exec.Command("/usr/bin/env", "mv", "-t", toDir, from) // nolint
}
cmd.Stderr = &errOut
if err := cmd.Run(); err != nil {
return xerrors.Errorf("exec mv (stderr: %s): %w", strings.TrimSpace(errOut.String()), err)

View File

@ -5,6 +5,7 @@ import (
"errors"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
)
@ -17,6 +18,14 @@ func (i UnpaddedByteIndex) Padded() PaddedByteIndex {
return PaddedByteIndex(abi.UnpaddedPieceSize(i).Padded())
}
func (i UnpaddedByteIndex) Valid() error {
if i%127 != 0 {
return xerrors.Errorf("unpadded byte index must be a multiple of 127")
}
return nil
}
type PaddedByteIndex uint64
type RGetter func(ctx context.Context, id abi.SectorID) (cid.Cid, error)

View File

@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"io"
"time"
"github.com/google/uuid"
@ -87,7 +86,6 @@ type WorkerCalls interface {
ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (CallID, error)
MoveStorage(ctx context.Context, sector storage.SectorRef, types SectorFileType) (CallID, error)
UnsealPiece(context.Context, storage.SectorRef, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (CallID, error)
ReadPiece(context.Context, io.Writer, storage.SectorRef, UnpaddedByteIndex, abi.UnpaddedPieceSize) (CallID, error)
Fetch(context.Context, storage.SectorRef, SectorFileType, PathType, AcquireMode) (CallID, error)
}

View File

@ -161,7 +161,6 @@ const (
ReleaseUnsealed ReturnType = "ReleaseUnsealed"
MoveStorage ReturnType = "MoveStorage"
UnsealPiece ReturnType = "UnsealPiece"
ReadPiece ReturnType = "ReadPiece"
Fetch ReturnType = "Fetch"
)
@ -209,7 +208,6 @@ var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storifac
ReleaseUnsealed: rfunc(storiface.WorkerReturn.ReturnReleaseUnsealed),
MoveStorage: rfunc(storiface.WorkerReturn.ReturnMoveStorage),
UnsealPiece: rfunc(storiface.WorkerReturn.ReturnUnsealPiece),
ReadPiece: rfunc(storiface.WorkerReturn.ReturnReadPiece),
Fetch: rfunc(storiface.WorkerReturn.ReturnFetch),
}
@ -430,6 +428,7 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storage.SectorRef,
}
return l.asyncCall(ctx, sector, UnsealPiece, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
log.Debugf("worker will unseal piece now, sector=%+v", sector.ID)
if err = sb.UnsealPiece(ctx, sector, index, size, randomness, cid); err != nil {
return nil, xerrors.Errorf("unsealing sector: %w", err)
}
@ -442,21 +441,12 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storage.SectorRef,
return nil, xerrors.Errorf("removing source data: %w", err)
}
log.Debugf("worker has unsealed piece, sector=%+v", sector.ID)
return nil, nil
})
}
func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
return l.asyncCall(ctx, sector, ReadPiece, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
return sb.ReadPiece(ctx, writer, sector, index, size)
})
}
func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) {
l.taskLk.Lock()
defer l.taskLk.Unlock()

View File

@ -2,7 +2,6 @@ package sectorstorage
import (
"context"
"io"
"sync"
"time"
@ -156,8 +155,4 @@ func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, i
return t.tracker.track(ctx, t.wid, t.workerInfo, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid))
}
func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
return t.tracker.track(ctx, t.wid, t.workerInfo, id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size))
}
var _ Worker = &trackedWorker{}

2
go.mod
View File

@ -55,7 +55,7 @@ require (
github.com/gdamore/tcell/v2 v2.2.0
github.com/go-kit/kit v0.10.0
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/golang/mock v1.4.4
github.com/golang/mock v1.5.0
github.com/google/uuid v1.1.2
github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.4.2

3
go.sum
View File

@ -416,8 +416,9 @@ github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/mock v1.5.0 h1:jlYHihg//f7RRwuPfptm04yp4s7O6Kw8EZiVYIGcH0g=
github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=

View File

@ -5,6 +5,7 @@ import (
"io"
"github.com/filecoin-project/lotus/api/v1api"
"golang.org/x/xerrors"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
@ -25,15 +26,15 @@ import (
var log = logging.Logger("retrievaladapter")
type retrievalProviderNode struct {
miner *storage.Miner
sealer sectorstorage.SectorManager
full v1api.FullNode
miner *storage.Miner
pp sectorstorage.PieceProvider
full v1api.FullNode
}
// NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the
// Lotus Node
func NewRetrievalProviderNode(miner *storage.Miner, sealer sectorstorage.SectorManager, full v1api.FullNode) retrievalmarket.RetrievalProviderNode {
return &retrievalProviderNode{miner, sealer, full}
func NewRetrievalProviderNode(miner *storage.Miner, pp sectorstorage.PieceProvider, full v1api.FullNode) retrievalmarket.RetrievalProviderNode {
return &retrievalProviderNode{miner, pp, full}
}
func (rpn *retrievalProviderNode) GetMinerWorkerAddress(ctx context.Context, miner address.Address, tok shared.TipSetToken) (address.Address, error) {
@ -67,24 +68,18 @@ func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID abi
ProofType: si.SectorType,
}
// Set up a pipe so that data can be written from the unsealing process
// into the reader returned by this function
r, w := io.Pipe()
go func() {
var commD cid.Cid
if si.CommD != nil {
commD = *si.CommD
}
var commD cid.Cid
if si.CommD != nil {
commD = *si.CommD
}
// Read the piece into the pipe's writer, unsealing the piece if necessary
log.Debugf("read piece in sector %d, offset %d, length %d from miner %d", sectorID, offset, length, mid)
err := rpn.sealer.ReadPiece(ctx, w, ref, storiface.UnpaddedByteIndex(offset), length, si.TicketValue, commD)
if err != nil {
log.Errorf("failed to unseal piece from sector %d: %s", sectorID, err)
}
// Close the reader with any error that was returned while reading the piece
_ = w.CloseWithError(err)
}()
// Get a reader for the piece, unsealing the piece if necessary
log.Debugf("read piece in sector %d, offset %d, length %d from miner %d", sectorID, offset, length, mid)
r, unsealed, err := rpn.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(offset), length, si.TicketValue, commD)
if err != nil {
return nil, xerrors.Errorf("failed to unseal piece from sector %d: %w", sectorID, err)
}
_ = unsealed // todo: use
return r, nil
}

View File

@ -375,9 +375,12 @@ var MinerNode = Options(
Override(new(*stores.Index), stores.NewIndex),
Override(new(stores.SectorIndex), From(new(*stores.Index))),
Override(new(stores.LocalStorage), From(new(repo.LockedRepo))),
Override(new(*stores.Local), modules.LocalStorage),
Override(new(*stores.Remote), modules.RemoteStorage),
Override(new(*sectorstorage.Manager), modules.SectorStorage),
Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))),
Override(new(storiface.WorkerReturn), From(new(sectorstorage.SectorManager))),
Override(new(sectorstorage.Unsealer), From(new(*sectorstorage.Manager))),
// Sector storage: Proofs
Override(new(ffiwrapper.Verifier), ffiwrapper.ProofVerifier),
@ -405,6 +408,7 @@ var MinerNode = Options(
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
// Markets (retrieval)
Override(new(sectorstorage.PieceProvider), sectorstorage.NewPieceProvider),
Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider),
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)),
Override(HandleRetrievalKey, modules.HandleRetrieval),

View File

@ -38,6 +38,16 @@ func connectRemoteWorker(ctx context.Context, fa api.Common, url string) (*remot
return nil, xerrors.Errorf("creating jsonrpc client: %w", err)
}
wver, err := wapi.Version(ctx)
if err != nil {
closer()
return nil, err
}
if !wver.EqMajorMinor(api.WorkerAPIVersion0) {
return nil, xerrors.Errorf("unsupported worker api version: %s (expected %s)", wver, api.WorkerAPIVersion0)
}
return &remoteWorker{wapi, closer}, nil
}

View File

@ -637,17 +637,15 @@ func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dt
// RetrievalProvider creates a new retrieval provider attached to the provider blockstore
func RetrievalProvider(h host.Host,
miner *storage.Miner,
sealer sectorstorage.SectorManager,
full v1api.FullNode,
ds dtypes.MetadataDS,
pieceStore dtypes.ProviderPieceStore,
mds dtypes.StagingMultiDstore,
dt dtypes.ProviderDataTransfer,
onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc,
pieceProvider sectorstorage.PieceProvider,
userFilter dtypes.RetrievalDealFilter,
) (retrievalmarket.RetrievalProvider, error) {
adapter := retrievaladapter.NewRetrievalProviderNode(miner, sealer, full)
adapter := retrievaladapter.NewRetrievalProviderNode(miner, pieceProvider, full)
maddr, err := minerAddrFromDS(ds)
if err != nil {
@ -663,13 +661,22 @@ func RetrievalProvider(h host.Host,
var WorkerCallsPrefix = datastore.NewKey("/worker/calls")
var ManagerWorkPrefix = datastore.NewKey("/stmgr/calls")
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.SealerConfig, urls sectorstorage.URLs, sa sectorstorage.StorageAuth, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) {
func LocalStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, urls sectorstorage.URLs) (*stores.Local, error) {
ctx := helpers.LifecycleCtx(mctx, lc)
return stores.NewLocal(ctx, ls, si, urls)
}
func RemoteStorage(lstor *stores.Local, si stores.SectorIndex, sa sectorstorage.StorageAuth, sc sectorstorage.SealerConfig) *stores.Remote {
return stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit, &stores.DefaultPartialFileHandler{})
}
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.SealerConfig, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) {
ctx := helpers.LifecycleCtx(mctx, lc)
wsts := statestore.New(namespace.Wrap(ds, WorkerCallsPrefix))
smsts := statestore.New(namespace.Wrap(ds, ManagerWorkPrefix))
sst, err := sectorstorage.New(ctx, ls, si, sc, urls, sa, wsts, smsts)
sst, err := sectorstorage.New(ctx, lstor, stor, ls, si, sc, wsts, smsts)
if err != nil {
return nil, err
}

View File

@ -521,9 +521,14 @@ func mockSbBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []tes
}
fulls[i].Stb = storageBuilder(fulls[i], mn, node.Options(
node.Override(new(sectorstorage.SectorManager), func() (sectorstorage.SectorManager, error) {
node.Override(new(*mock.SectorMgr), func() (*mock.SectorMgr, error) {
return mock.NewMockSectorMgr(nil), nil
}),
node.Override(new(sectorstorage.SectorManager), node.From(new(*mock.SectorMgr))),
node.Override(new(sectorstorage.Unsealer), node.From(new(*mock.SectorMgr))),
node.Override(new(sectorstorage.PieceProvider), node.From(new(*mock.SectorMgr))),
node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),
node.Override(new(ffiwrapper.Prover), mock.MockProver),
node.Unset(new(*sectorstorage.Manager)),
@ -564,9 +569,14 @@ func mockSbBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []tes
opts = node.Options()
}
storers[i] = CreateTestStorageNode(ctx, t, genms[i].Worker, maddrs[i], pidKeys[i], f, mn, node.Options(
node.Override(new(sectorstorage.SectorManager), func() (sectorstorage.SectorManager, error) {
node.Override(new(*mock.SectorMgr), func() (*mock.SectorMgr, error) {
return mock.NewMockSectorMgr(sectors), nil
}),
node.Override(new(sectorstorage.SectorManager), node.From(new(*mock.SectorMgr))),
node.Override(new(sectorstorage.Unsealer), node.From(new(*mock.SectorMgr))),
node.Override(new(sectorstorage.PieceProvider), node.From(new(*mock.SectorMgr))),
node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),
node.Override(new(ffiwrapper.Prover), mock.MockProver),
node.Unset(new(*sectorstorage.Manager)),