bypass task scheduler for reading unsealed pieces

This commit is contained in:
aarshkshah1992 2021-05-18 13:02:30 +05:30 committed by Dirk McCormick
parent 2d4eaf08c4
commit 2a40c802ea
12 changed files with 459 additions and 173 deletions

View File

@ -11,6 +11,7 @@ import (
"os" "os"
"runtime" "runtime"
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -66,7 +67,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existi
} }
var done func() var done func()
var stagedFile *partialFile var stagedFile *partialfile.PartialFile
defer func() { defer func() {
if done != nil { if done != nil {
@ -87,7 +88,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existi
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err) return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
} }
stagedFile, err = createPartialFile(maxPieceSize, stagedPath.Unsealed) stagedFile, err = partialfile.CreatePartialFile(maxPieceSize, stagedPath.Unsealed)
if err != nil { if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("creating unsealed sector file: %w", err) return abi.PieceInfo{}, xerrors.Errorf("creating unsealed sector file: %w", err)
} }
@ -97,7 +98,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existi
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err) return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
} }
stagedFile, err = openPartialFile(maxPieceSize, stagedPath.Unsealed) stagedFile, err = partialfile.OpenPartialFile(maxPieceSize, stagedPath.Unsealed)
if err != nil { if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("opening unsealed sector file: %w", err) return abi.PieceInfo{}, xerrors.Errorf("opening unsealed sector file: %w", err)
} }
@ -257,7 +258,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, off
// try finding existing // try finding existing
unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage) unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage)
var pf *partialFile var pf *partialfile.PartialFile
switch { switch {
case xerrors.Is(err, storiface.ErrSectorNotFound): case xerrors.Is(err, storiface.ErrSectorNotFound):
@ -267,7 +268,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, off
} }
defer done() defer done()
pf, err = createPartialFile(maxPieceSize, unsealedPath.Unsealed) pf, err = partialfile.CreatePartialFile(maxPieceSize, unsealedPath.Unsealed)
if err != nil { if err != nil {
return xerrors.Errorf("create unsealed file: %w", err) return xerrors.Errorf("create unsealed file: %w", err)
} }
@ -275,7 +276,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, off
case err == nil: case err == nil:
defer done() defer done()
pf, err = openPartialFile(maxPieceSize, unsealedPath.Unsealed) pf, err = partialfile.OpenPartialFile(maxPieceSize, unsealedPath.Unsealed)
if err != nil { if err != nil {
return xerrors.Errorf("opening partial file: %w", err) return xerrors.Errorf("opening partial file: %w", err)
} }
@ -427,7 +428,7 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector storag
} }
maxPieceSize := abi.PaddedPieceSize(ssize) maxPieceSize := abi.PaddedPieceSize(ssize)
pf, err := openPartialFile(maxPieceSize, path.Unsealed) pf, err := partialfile.OpenPartialFile(maxPieceSize, path.Unsealed)
if err != nil { if err != nil {
if xerrors.Is(err, os.ErrNotExist) { if xerrors.Is(err, os.ErrNotExist) {
return false, nil return false, nil
@ -589,7 +590,7 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector storage.SectorRef,
if len(keepUnsealed) > 0 { if len(keepUnsealed) > 0 {
sr := pieceRun(0, maxPieceSize) sr := partialfile.PieceRun(0, maxPieceSize)
for _, s := range keepUnsealed { for _, s := range keepUnsealed {
si := &rlepluslazy.RunSliceIterator{} si := &rlepluslazy.RunSliceIterator{}
@ -611,7 +612,7 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector storage.SectorRef,
} }
defer done() defer done()
pf, err := openPartialFile(maxPieceSize, paths.Unsealed) pf, err := partialfile.OpenPartialFile(maxPieceSize, paths.Unsealed)
if err == nil { if err == nil {
var at uint64 var at uint64
for sr.HasNext() { for sr.HasNext() {

View File

@ -1,6 +1,7 @@
package ffiwrapper package ffiwrapper
import ( import (
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"golang.org/x/xerrors" "golang.org/x/xerrors"
rlepluslazy "github.com/filecoin-project/go-bitfield/rle" rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
@ -17,7 +18,7 @@ const mergeGaps = 32 << 20
// TODO const expandRuns = 16 << 20 // unseal more than requested for future requests // TODO const expandRuns = 16 << 20 // unseal more than requested for future requests
func computeUnsealRanges(unsealed rlepluslazy.RunIterator, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (rlepluslazy.RunIterator, error) { func computeUnsealRanges(unsealed rlepluslazy.RunIterator, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (rlepluslazy.RunIterator, error) {
todo := pieceRun(offset.Padded(), size.Padded()) todo := partialfile.PieceRun(offset.Padded(), size.Padded())
todo, err := rlepluslazy.Subtract(todo, unsealed) todo, err := rlepluslazy.Subtract(todo, unsealed)
if err != nil { if err != nil {
return nil, xerrors.Errorf("compute todo-unsealed: %w", err) return nil, xerrors.Errorf("compute todo-unsealed: %w", err)

View File

@ -51,13 +51,12 @@ func (r *unpadReader) Read(out []byte) (int, error) {
r.left -= uint64(todo) r.left -= uint64(todo)
n, err := r.src.Read(r.work[:todo]) n, err := io.ReadAtLeast(r.src, r.work[:todo], int(todo))
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return n, err return n, err
} }
if n < int(todo) {
if n != int(todo) { return 0, xerrors.Errorf("didn't read enough: %d / %d, left %d, out %d", n, todo, r.left, len(out))
return 0, xerrors.Errorf("didn't read enough: %w", err)
} }
Unpad(r.work[:todo], out[:todo.Unpadded()]) Unpad(r.work[:todo], out[:todo.Unpadded()])

View File

@ -47,8 +47,6 @@ type Worker interface {
} }
type SectorManager interface { type SectorManager interface {
ReadPiece(context.Context, io.Writer, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
ffiwrapper.StorageSealer ffiwrapper.StorageSealer
storage.Prover storage.Prover
storiface.WorkerReturn storiface.WorkerReturn
@ -206,71 +204,7 @@ func (m *Manager) schedFetch(sector storage.SectorRef, ft storiface.SectorFileTy
} }
} }
func (m *Manager) readPiece(sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, rok *bool) func(ctx context.Context, w Worker) error { func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed *cid.Cid) error {
return func(ctx context.Context, w Worker) error {
log.Debugf("read piece data from sector %d, offset %d, size %d", sector.ID, offset, size)
r, err := m.waitSimpleCall(ctx)(w.ReadPiece(ctx, sink, sector, offset, size))
if err != nil {
return err
}
if r != nil {
*rok = r.(bool)
}
log.Debugf("completed read piece data from sector %d, offset %d, size %d: read ok? %t", sector.ID, offset, size, *rok)
return nil
}
}
func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (foundUnsealed bool, readOk bool, selector WorkerSelector, returnErr error) {
// acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx)
defer cancel()
log.Debugf("acquire read sector lock for sector %d", sector.ID)
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
returnErr = xerrors.Errorf("acquiring read sector lock: %w", err)
return
}
log.Debugf("find unsealed sector %d", sector.ID)
// passing 0 spt because we only need it when allowFetch is true
best, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
if err != nil {
returnErr = xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err)
return
}
foundUnsealed = len(best) > 0
if foundUnsealed { // append to existing
// There is unsealed sector, see if we can read from it
log.Debugf("found unsealed sector %d", sector.ID)
selector = newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false)
log.Debugf("scheduling read of unsealed sector %d", sector.ID)
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove),
m.readPiece(sink, sector, offset, size, &readOk))
if err != nil {
returnErr = xerrors.Errorf("reading piece from sealed sector: %w", err)
}
} else {
log.Debugf("did not find unsealed sector %d", sector.ID)
selector = newAllocSelector(m.index, storiface.FTUnsealed, storiface.PathSealing)
}
return
}
func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error {
log.Debugf("fetch and read piece in sector %d, offset %d, size %d", sector.ID, offset, size)
foundUnsealed, readOk, selector, err := m.tryReadUnsealedPiece(ctx, sink, sector, offset, size)
if err != nil {
return err
}
if readOk {
log.Debugf("completed read of unsealed piece in sector %d, offset %d, size %d", sector.ID, offset, size)
return nil
}
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
@ -279,22 +213,16 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage.
return xerrors.Errorf("acquiring unseal sector lock: %w", err) return xerrors.Errorf("acquiring unseal sector lock: %w", err)
} }
unsealFetch := func(ctx context.Context, worker Worker) error { sealFetch := func(ctx context.Context, worker Worker) error {
log.Debugf("copy sealed/cache sector data for sector %d", sector.ID) log.Debugf("copy sealed/cache sector data for sector %d", sector.ID)
if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil { if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil {
return xerrors.Errorf("copy sealed/cache sector data: %w", err) return xerrors.Errorf("copy sealed/cache sector data: %w", err)
} }
if foundUnsealed {
log.Debugf("copy unsealed sector data for sector %d", sector.ID)
if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove)); err != nil {
return xerrors.Errorf("copy unsealed sector data: %w", err)
}
}
return nil return nil
} }
if unsealed == cid.Undef { if unsealed == nil {
return xerrors.Errorf("cannot unseal piece (sector: %d, offset: %d size: %d) - unsealed cid is undefined", sector, offset, size) return xerrors.Errorf("cannot unseal piece (sector: %d, offset: %d size: %d) - unsealed cid is undefined", sector, offset, size)
} }
@ -303,15 +231,17 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage.
return xerrors.Errorf("getting sector size: %w", err) return xerrors.Errorf("getting sector size: %w", err)
} }
selector := newExistingSelector(m.index, sector.ID, storiface.FTSealed|storiface.FTCache, true)
log.Debugf("schedule unseal for sector %d", sector.ID) log.Debugf("schedule unseal for sector %d", sector.ID)
err = m.sched.Schedule(ctx, sector, sealtasks.TTUnseal, selector, unsealFetch, func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTUnseal, selector, sealFetch, func(ctx context.Context, w Worker) error {
// TODO: make restartable // TODO: make restartable
// NOTE: we're unsealing the whole sector here as with SDR we can't really // NOTE: we're unsealing the whole sector here as with SDR we can't really
// unseal the sector partially. Requesting the whole sector here can // unseal the sector partially. Requesting the whole sector here can
// save us some work in case another piece is requested from here // save us some work in case another piece is requested from here
log.Debugf("unseal sector %d", sector.ID) log.Debugf("unseal sector %d", sector.ID)
_, err := m.waitSimpleCall(ctx)(w.UnsealPiece(ctx, sector, 0, abi.PaddedPieceSize(ssize).Unpadded(), ticket, unsealed)) _, err := m.waitSimpleCall(ctx)(w.UnsealPiece(ctx, sector, 0, abi.PaddedPieceSize(ssize).Unpadded(), ticket, *unsealed))
log.Debugf("completed unseal sector %d", sector.ID) log.Debugf("completed unseal sector %d", sector.ID)
return err return err
}) })
@ -319,20 +249,6 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage.
return err return err
} }
selector = newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false)
log.Debugf("schedule read piece for sector %d, offset %d, size %d", sector.ID, offset, size)
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove),
m.readPiece(sink, sector, offset, size, &readOk))
if err != nil {
return xerrors.Errorf("reading piece from sealed sector: %w", err)
}
if !readOk {
return xerrors.Errorf("failed to read unsealed piece")
}
log.Debugf("completed read of piece in sector %d, offset %d, size %d", sector.ID, offset, size)
return nil return nil
} }
@ -767,4 +683,5 @@ func (m *Manager) Close(ctx context.Context) error {
return m.sched.Close(ctx) return m.sched.Close(ctx)
} }
var _ Unsealer = &Manager{}
var _ SectorManager = &Manager{} var _ SectorManager = &Manager{}

View File

@ -1,4 +1,4 @@
package ffiwrapper package partialfile
import ( import (
"encoding/binary" "encoding/binary"
@ -7,6 +7,7 @@ import (
"syscall" "syscall"
"github.com/detailyang/go-fallocate" "github.com/detailyang/go-fallocate"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
rlepluslazy "github.com/filecoin-project/go-bitfield/rle" rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
@ -16,6 +17,8 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
) )
var log = logging.Logger("partialfile")
const veryLargeRle = 1 << 20 const veryLargeRle = 1 << 20
// Sectors can be partially unsealed. We support this by appending a small // Sectors can be partially unsealed. We support this by appending a small
@ -25,7 +28,7 @@ const veryLargeRle = 1 << 20
// unsealed sector files internally have this structure // unsealed sector files internally have this structure
// [unpadded (raw) data][rle+][4B LE length fo the rle+ field] // [unpadded (raw) data][rle+][4B LE length fo the rle+ field]
type partialFile struct { type PartialFile struct {
maxPiece abi.PaddedPieceSize maxPiece abi.PaddedPieceSize
path string path string
@ -57,7 +60,7 @@ func writeTrailer(maxPieceSize int64, w *os.File, r rlepluslazy.RunIterator) err
return w.Truncate(maxPieceSize + int64(rb) + 4) return w.Truncate(maxPieceSize + int64(rb) + 4)
} }
func createPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialFile, error) { func CreatePartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*PartialFile, error) {
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644) // nolint f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644) // nolint
if err != nil { if err != nil {
return nil, xerrors.Errorf("openning partial file '%s': %w", path, err) return nil, xerrors.Errorf("openning partial file '%s': %w", path, err)
@ -89,10 +92,10 @@ func createPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialF
return nil, xerrors.Errorf("close empty partial file: %w", err) return nil, xerrors.Errorf("close empty partial file: %w", err)
} }
return openPartialFile(maxPieceSize, path) return OpenPartialFile(maxPieceSize, path)
} }
func openPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialFile, error) { func OpenPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*PartialFile, error) {
f, err := os.OpenFile(path, os.O_RDWR, 0644) // nolint f, err := os.OpenFile(path, os.O_RDWR, 0644) // nolint
if err != nil { if err != nil {
return nil, xerrors.Errorf("openning partial file '%s': %w", path, err) return nil, xerrors.Errorf("openning partial file '%s': %w", path, err)
@ -165,7 +168,7 @@ func openPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialFil
return nil, err return nil, err
} }
return &partialFile{ return &PartialFile{
maxPiece: maxPieceSize, maxPiece: maxPieceSize,
path: path, path: path,
allocated: rle, allocated: rle,
@ -173,11 +176,11 @@ func openPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialFil
}, nil }, nil
} }
func (pf *partialFile) Close() error { func (pf *PartialFile) Close() error {
return pf.file.Close() return pf.file.Close()
} }
func (pf *partialFile) Writer(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Writer, error) { func (pf *PartialFile) Writer(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Writer, error) {
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil { if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek piece start: %w", err) return nil, xerrors.Errorf("seek piece start: %w", err)
} }
@ -188,7 +191,7 @@ func (pf *partialFile) Writer(offset storiface.PaddedByteIndex, size abi.PaddedP
return nil, err return nil, err
} }
and, err := rlepluslazy.And(have, pieceRun(offset, size)) and, err := rlepluslazy.And(have, PieceRun(offset, size))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -206,13 +209,13 @@ func (pf *partialFile) Writer(offset storiface.PaddedByteIndex, size abi.PaddedP
return pf.file, nil return pf.file, nil
} }
func (pf *partialFile) MarkAllocated(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error { func (pf *PartialFile) MarkAllocated(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error {
have, err := pf.allocated.RunIterator() have, err := pf.allocated.RunIterator()
if err != nil { if err != nil {
return err return err
} }
ored, err := rlepluslazy.Or(have, pieceRun(offset, size)) ored, err := rlepluslazy.Or(have, PieceRun(offset, size))
if err != nil { if err != nil {
return err return err
} }
@ -224,7 +227,7 @@ func (pf *partialFile) MarkAllocated(offset storiface.PaddedByteIndex, size abi.
return nil return nil
} }
func (pf *partialFile) Free(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error { func (pf *PartialFile) Free(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error {
have, err := pf.allocated.RunIterator() have, err := pf.allocated.RunIterator()
if err != nil { if err != nil {
return err return err
@ -234,7 +237,7 @@ func (pf *partialFile) Free(offset storiface.PaddedByteIndex, size abi.PaddedPie
return xerrors.Errorf("deallocating: %w", err) return xerrors.Errorf("deallocating: %w", err)
} }
s, err := rlepluslazy.Subtract(have, pieceRun(offset, size)) s, err := rlepluslazy.Subtract(have, PieceRun(offset, size))
if err != nil { if err != nil {
return err return err
} }
@ -246,7 +249,7 @@ func (pf *partialFile) Free(offset storiface.PaddedByteIndex, size abi.PaddedPie
return nil return nil
} }
func (pf *partialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) { func (pf *PartialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil { if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek piece start: %w", err) return nil, xerrors.Errorf("seek piece start: %w", err)
} }
@ -257,7 +260,7 @@ func (pf *partialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedP
return nil, err return nil, err
} }
and, err := rlepluslazy.And(have, pieceRun(offset, size)) and, err := rlepluslazy.And(have, PieceRun(offset, size))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -275,17 +278,17 @@ func (pf *partialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedP
return pf.file, nil return pf.file, nil
} }
func (pf *partialFile) Allocated() (rlepluslazy.RunIterator, error) { func (pf *PartialFile) Allocated() (rlepluslazy.RunIterator, error) {
return pf.allocated.RunIterator() return pf.allocated.RunIterator()
} }
func (pf *partialFile) HasAllocated(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) { func (pf *PartialFile) HasAllocated(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
have, err := pf.Allocated() have, err := pf.Allocated()
if err != nil { if err != nil {
return false, err return false, err
} }
u, err := rlepluslazy.And(have, pieceRun(offset.Padded(), size.Padded())) u, err := rlepluslazy.And(have, PieceRun(offset.Padded(), size.Padded()))
if err != nil { if err != nil {
return false, err return false, err
} }
@ -298,7 +301,7 @@ func (pf *partialFile) HasAllocated(offset storiface.UnpaddedByteIndex, size abi
return abi.PaddedPieceSize(uc) == size.Padded(), nil return abi.PaddedPieceSize(uc) == size.Padded(), nil
} }
func pieceRun(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) rlepluslazy.RunIterator { func PieceRun(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) rlepluslazy.RunIterator {
var runs []rlepluslazy.Run var runs []rlepluslazy.Run
if offset > 0 { if offset > 0 {
runs = append(runs, rlepluslazy.Run{ runs = append(runs, rlepluslazy.Run{

117
extern/sector-storage/piece_provider.go vendored Normal file
View File

@ -0,0 +1,117 @@
package sectorstorage
import (
"bufio"
"context"
"io"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/fr32"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
type Unsealer interface {
SectorsUnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd *cid.Cid) error
}
type PieceProvider interface {
ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
}
type pieceProvider struct {
storage *stores.Remote
index stores.SectorIndex
uns Unsealer
}
func NewPieceProvider(storage *stores.Remote, index stores.SectorIndex, uns Unsealer) PieceProvider {
return &pieceProvider{
storage: storage,
index: index,
uns: uns,
}
}
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) {
// acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx)
if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
cancel()
return nil, nil, xerrors.Errorf("acquiring read sector lock: %w", err)
}
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded())
if err != nil {
cancel()
return nil, nil, err
}
if r == nil {
cancel()
}
return r, cancel, nil
}
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
if err := offset.Valid(); err != nil {
return nil, false, xerrors.Errorf("offset is not valid: %w", err)
}
if err := size.Validate(); err != nil {
return nil, false, xerrors.Errorf("size is not a valid piece size: %w", err)
}
r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, offset, size)
if xerrors.Is(err, storiface.ErrSectorNotFound) {
err = nil
}
if err != nil {
return nil, false, err
}
var uns bool
if r == nil {
uns = true
commd := &unsealed
if unsealed == cid.Undef {
commd = nil
}
if err := p.uns.SectorsUnsealPiece(ctx, sector, offset, size, ticket, commd); err != nil {
return nil, false, xerrors.Errorf("unsealing piece: %w", err)
}
r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, offset, size)
if err != nil {
return nil, true, xerrors.Errorf("read after unsealing: %w", err)
}
if r == nil {
return nil, true, xerrors.Errorf("got no reader after unsealing piece")
}
}
upr, err := fr32.NewUnpadReader(r, size.Padded())
if err != nil {
return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err)
}
return &funcCloser{
Reader: bufio.NewReaderSize(upr, 127),
close: func() error {
err = r.Close()
unlock()
return err
},
}, uns, nil
}
type funcCloser struct {
io.Reader
close func() error
}
func (fc *funcCloser) Close() error { return fc.close() }

View File

@ -5,7 +5,10 @@ import (
"io" "io"
"net/http" "net/http"
"os" "os"
"strconv"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/gorilla/mux" "github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -29,6 +32,8 @@ func (handler *FetchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc("/remote/{type}/{id}", handler.remoteGetSector).Methods("GET") mux.HandleFunc("/remote/{type}/{id}", handler.remoteGetSector).Methods("GET")
mux.HandleFunc("/remote/{type}/{id}", handler.remoteDeleteSector).Methods("DELETE") mux.HandleFunc("/remote/{type}/{id}", handler.remoteDeleteSector).Methods("DELETE")
mux.HandleFunc("/remote/{type}/{id}/{spt}/allocated/{offset}/{size}", handler.remoteGetAllocated).Methods("GET")
mux.ServeHTTP(w, r) mux.ServeHTTP(w, r)
} }
@ -73,7 +78,6 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
} }
// The caller has a lock on this sector already, no need to get one here // The caller has a lock on this sector already, no need to get one here
// passing 0 spt because we don't allocate anything // passing 0 spt because we don't allocate anything
si := storage.SectorRef{ si := storage.SectorRef{
ID: id, ID: id,
@ -103,32 +107,30 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
return return
} }
var rd io.Reader
if stat.IsDir() { if stat.IsDir() {
rd, err = tarutil.TarDirectory(path) if _, has := r.Header["Range"]; has {
w.Header().Set("Content-Type", "application/x-tar") log.Error("Range not supported on directories")
} else { w.WriteHeader(500)
rd, err = os.OpenFile(path, os.O_RDONLY, 0644) // nolint return
w.Header().Set("Content-Type", "application/octet-stream")
} }
rd, err := tarutil.TarDirectory(path)
if err != nil { if err != nil {
log.Errorf("%+v", err) log.Errorf("%+v", err)
w.WriteHeader(500) w.WriteHeader(500)
return return
} }
if !stat.IsDir() {
defer func() {
if err := rd.(*os.File).Close(); err != nil {
log.Errorf("closing source file: %+v", err)
}
}()
}
w.Header().Set("Content-Type", "application/x-tar")
w.WriteHeader(200) w.WriteHeader(200)
if _, err := io.CopyBuffer(w, rd, make([]byte, CopyBuf)); err != nil { if _, err := io.CopyBuffer(w, rd, make([]byte, CopyBuf)); err != nil {
log.Errorf("%+v", err) log.Errorf("%+v", err)
return return
} }
} else {
w.Header().Set("Content-Type", "application/octet-stream")
http.ServeFile(w, r, path)
}
} }
func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.Request) { func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.Request) {
@ -156,6 +158,104 @@ func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.R
} }
} }
func (handler *FetchHandler) remoteGetAllocated(w http.ResponseWriter, r *http.Request) {
log.Infof("SERVE Alloc check %s", r.URL)
vars := mux.Vars(r)
id, err := storiface.ParseSectorID(vars["id"])
if err != nil {
log.Errorf("%+v", err)
w.WriteHeader(500)
return
}
ft, err := ftFromString(vars["type"])
if err != nil {
log.Errorf("%+v", err)
w.WriteHeader(500)
return
}
if ft != storiface.FTUnsealed {
log.Errorf("/allocated only supports unsealed sector files")
w.WriteHeader(500)
return
}
spti, err := strconv.ParseInt(vars["spt"], 10, 64)
if err != nil {
log.Errorf("parsing spt: %+v", err)
w.WriteHeader(500)
return
}
spt := abi.RegisteredSealProof(spti)
ssize, err := spt.SectorSize()
if err != nil {
log.Errorf("%+v", err)
w.WriteHeader(500)
return
}
offi, err := strconv.ParseInt(vars["offset"], 10, 64)
if err != nil {
log.Errorf("parsing offset: %+v", err)
w.WriteHeader(500)
return
}
szi, err := strconv.ParseInt(vars["size"], 10, 64)
if err != nil {
log.Errorf("parsing spt: %+v", err)
w.WriteHeader(500)
return
}
// The caller has a lock on this sector already, no need to get one here
// passing 0 spt because we don't allocate anything
si := storage.SectorRef{
ID: id,
ProofType: 0,
}
paths, _, err := handler.Local.AcquireSector(r.Context(), si, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
log.Errorf("%+v", err)
w.WriteHeader(500)
return
}
path := storiface.PathByType(paths, ft)
if path == "" {
log.Error("acquired path was empty")
w.WriteHeader(500)
return
}
pf, err := partialfile.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
if err != nil {
log.Error("opening partial file: ", err)
w.WriteHeader(500)
return
}
defer func() {
if err := pf.Close(); err != nil {
log.Error("close partial file: ", err)
}
}()
has, err := pf.HasAllocated(storiface.UnpaddedByteIndex(offi), abi.UnpaddedPieceSize(szi))
if err != nil {
log.Error("has allocated: ", err)
w.WriteHeader(500)
return
}
if has {
w.WriteHeader(http.StatusOK)
return
}
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
}
func ftFromString(t string) (storiface.SectorFileType, error) { func ftFromString(t string) (storiface.SectorFileType, error) {
switch t { switch t {
case storiface.FTUnsealed.String(): case storiface.FTUnsealed.String():

View File

@ -3,6 +3,7 @@ package stores
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"math/bits" "math/bits"
@ -16,6 +17,7 @@ import (
"sync" "sync"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/extern/sector-storage/tarutil" "github.com/filecoin-project/lotus/extern/sector-storage/tarutil"
@ -415,4 +417,145 @@ func (r *Remote) FsStat(ctx context.Context, id ID) (fsutil.FsStat, error) {
return out, nil return out, nil
} }
func (r *Remote) checkAllocated(ctx context.Context, url string, spt abi.RegisteredSealProof, offset, size abi.PaddedPieceSize) (bool, error) {
url = fmt.Sprintf("%s/%d/allocated/%d/%d", url, spt, offset.Unpadded(), size.Unpadded())
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return false, xerrors.Errorf("request: %w", err)
}
req.Header = r.auth.Clone()
req = req.WithContext(ctx)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return false, xerrors.Errorf("do request: %w", err)
}
defer resp.Body.Close() // nolint
switch resp.StatusCode {
case http.StatusOK:
return true, nil
case http.StatusRequestedRangeNotSatisfiable:
return false, nil
default:
return false, xerrors.Errorf("unexpected http response: %d", resp.StatusCode)
}
}
func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) {
if len(r.limit) >= cap(r.limit) {
log.Infof("Throttling remote read, %d already running", len(r.limit))
}
// TODO: Smarter throttling
// * Priority (just going sequentially is still pretty good)
// * Per interface
// * Aware of remote load
select {
case r.limit <- struct{}{}:
defer func() { <-r.limit }()
case <-ctx.Done():
return nil, xerrors.Errorf("context error while waiting for fetch limiter: %w", ctx.Err())
}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, xerrors.Errorf("request: %w", err)
}
req.Header = r.auth.Clone()
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+size-1))
req = req.WithContext(ctx)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, xerrors.Errorf("do request: %w", err)
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
resp.Body.Close() // nolint
return nil, xerrors.Errorf("non-200 code: %d", resp.StatusCode)
}
return resp.Body, nil
}
// Reader gets a reader for unsealed file range. Can return nil in case the requested range isn't allocated in the file
func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) {
ft := storiface.FTUnsealed
paths, _, err := r.local.AcquireSector(ctx, s, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
return nil, xerrors.Errorf("acquire local: %w", err)
}
path := storiface.PathByType(paths, ft)
var rd io.ReadCloser
if path == "" {
si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false)
if err != nil {
return nil, err
}
if len(si) == 0 {
return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound)
}
// TODO Why are we sorting in ascending order here -> shouldn't we sort in descending order as higher weight means more preferred to store ?
sort.Slice(si, func(i, j int) bool {
return si[i].Weight < si[j].Weight
})
iloop:
for _, info := range si {
for _, url := range info.URLs {
ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size)
if err != nil {
log.Warnw("check if remote has piece", "url", url, "error", err)
continue
}
if !ok {
continue
}
rd, err = r.readRemote(ctx, url, offset, size)
if err != nil {
log.Warnw("reading from remote", "url", url, "error", err)
continue
}
log.Infof("Read remote %s (+%d,%d)", url, offset, size)
break iloop
}
}
} else {
log.Infof("Read local %s (+%d,%d)", path, offset, size)
ssize, err := s.ProofType.SectorSize()
if err != nil {
return nil, err
}
pf, err := partialfile.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
if err != nil {
return nil, xerrors.Errorf("opening partial file: %w", err)
}
has, err := pf.HasAllocated(storiface.UnpaddedByteIndex(offset.Unpadded()), size.Unpadded())
if err != nil {
return nil, xerrors.Errorf("has allocated: %w", err)
}
if !has {
if err := pf.Close(); err != nil {
return nil, xerrors.Errorf("close partial file: %w", err)
}
return nil, nil
}
return pf.Reader(storiface.PaddedByteIndex(offset), size)
}
// note: rd can be nil
return rd, nil
}
var _ Store = &Remote{} var _ Store = &Remote{}

View File

@ -5,6 +5,7 @@ import (
"errors" "errors"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
) )
@ -17,6 +18,14 @@ func (i UnpaddedByteIndex) Padded() PaddedByteIndex {
return PaddedByteIndex(abi.UnpaddedPieceSize(i).Padded()) return PaddedByteIndex(abi.UnpaddedPieceSize(i).Padded())
} }
func (i UnpaddedByteIndex) Valid() error {
if i%127 != 0 {
return xerrors.Errorf("unpadded byte index must be a multiple of 127")
}
return nil
}
type PaddedByteIndex uint64 type PaddedByteIndex uint64
type RGetter func(ctx context.Context, id abi.SectorID) (cid.Cid, error) type RGetter func(ctx context.Context, id abi.SectorID) (cid.Cid, error)

View File

@ -5,6 +5,7 @@ import (
"io" "io"
"github.com/filecoin-project/lotus/api/v1api" "github.com/filecoin-project/lotus/api/v1api"
"golang.org/x/xerrors"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
@ -26,14 +27,14 @@ var log = logging.Logger("retrievaladapter")
type retrievalProviderNode struct { type retrievalProviderNode struct {
miner *storage.Miner miner *storage.Miner
sealer sectorstorage.SectorManager pp sectorstorage.PieceProvider
full v1api.FullNode full v1api.FullNode
} }
// NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the // NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the
// Lotus Node // Lotus Node
func NewRetrievalProviderNode(miner *storage.Miner, sealer sectorstorage.SectorManager, full v1api.FullNode) retrievalmarket.RetrievalProviderNode { func NewRetrievalProviderNode(miner *storage.Miner, pp sectorstorage.PieceProvider, full v1api.FullNode) retrievalmarket.RetrievalProviderNode {
return &retrievalProviderNode{miner, sealer, full} return &retrievalProviderNode{miner, pp, full}
} }
func (rpn *retrievalProviderNode) GetMinerWorkerAddress(ctx context.Context, miner address.Address, tok shared.TipSetToken) (address.Address, error) { func (rpn *retrievalProviderNode) GetMinerWorkerAddress(ctx context.Context, miner address.Address, tok shared.TipSetToken) (address.Address, error) {
@ -67,24 +68,18 @@ func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID abi
ProofType: si.SectorType, ProofType: si.SectorType,
} }
// Set up a pipe so that data can be written from the unsealing process
// into the reader returned by this function
r, w := io.Pipe()
go func() {
var commD cid.Cid var commD cid.Cid
if si.CommD != nil { if si.CommD != nil {
commD = *si.CommD commD = *si.CommD
} }
// Read the piece into the pipe's writer, unsealing the piece if necessary // Get a reader for the piece, unsealing the piece if necessary
log.Debugf("read piece in sector %d, offset %d, length %d from miner %d", sectorID, offset, length, mid) log.Debugf("read piece in sector %d, offset %d, length %d from miner %d", sectorID, offset, length, mid)
err := rpn.sealer.ReadPiece(ctx, w, ref, storiface.UnpaddedByteIndex(offset), length, si.TicketValue, commD) r, unsealed, err := rpn.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(offset), length, si.TicketValue, commD)
if err != nil { if err != nil {
log.Errorf("failed to unseal piece from sector %d: %s", sectorID, err) return nil, xerrors.Errorf("failed to unseal piece from sector %d: %w", sectorID, err)
} }
// Close the reader with any error that was returned while reading the piece _ = unsealed // todo: use
_ = w.CloseWithError(err)
}()
return r, nil return r, nil
} }

View File

@ -378,6 +378,7 @@ var MinerNode = Options(
Override(new(*sectorstorage.Manager), modules.SectorStorage), Override(new(*sectorstorage.Manager), modules.SectorStorage),
Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))), Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))),
Override(new(storiface.WorkerReturn), From(new(sectorstorage.SectorManager))), Override(new(storiface.WorkerReturn), From(new(sectorstorage.SectorManager))),
Override(new(sectorstorage.Unsealer), From(new(*sectorstorage.Manager))),
// Sector storage: Proofs // Sector storage: Proofs
Override(new(ffiwrapper.Verifier), ffiwrapper.ProofVerifier), Override(new(ffiwrapper.Verifier), ffiwrapper.ProofVerifier),
@ -404,6 +405,7 @@ var MinerNode = Options(
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
// Markets (retrieval) // Markets (retrieval)
Override(new(sectorstorage.PieceProvider), sectorstorage.NewPieceProvider),
Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider), Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider),
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)), Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)),
Override(HandleRetrievalKey, modules.HandleRetrieval), Override(HandleRetrievalKey, modules.HandleRetrieval),

View File

@ -641,11 +641,10 @@ func RetrievalProvider(h host.Host,
pieceStore dtypes.ProviderPieceStore, pieceStore dtypes.ProviderPieceStore,
mds dtypes.StagingMultiDstore, mds dtypes.StagingMultiDstore,
dt dtypes.ProviderDataTransfer, dt dtypes.ProviderDataTransfer,
onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc, pieceProvider sectorstorage.PieceProvider,
offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc,
userFilter dtypes.RetrievalDealFilter, userFilter dtypes.RetrievalDealFilter,
) (retrievalmarket.RetrievalProvider, error) { ) (retrievalmarket.RetrievalProvider, error) {
adapter := retrievaladapter.NewRetrievalProviderNode(miner, sealer, full) adapter := retrievaladapter.NewRetrievalProviderNode(miner, pieceProvider, full)
maddr, err := minerAddrFromDS(ds) maddr, err := minerAddrFromDS(ds)
if err != nil { if err != nil {