Merge pull request #7761 from filecoin-project/jen/rcgain
build: release: v1.13.2-rc4
This commit is contained in:
commit
e05fdf8923
20
CHANGELOG.md
20
CHANGELOG.md
@ -1,16 +1,19 @@
|
||||
# Lotus changelog
|
||||
|
||||
# v1.13.2-rc3 / 2021-12-09
|
||||
# v1.13.2-rc4 / 2021-12-10
|
||||
|
||||
This is the second RC for lotus v1.13.2, with a dependency upgrade of ipld-legacy which will fix ChainGetNode. This
|
||||
is a highly recommended release with sealing pipeline fixes, worker management and scheduler enhancement, retrieval improvements and so on. More detailed changelog will be updated later.
|
||||
This is the 4th RC for lotus v1.13.2, with another retrieval enhancement that fills the gap that's brought by the release. This is a highly recommended release with sealing pipeline fixes, worker management and scheduler enhancement, retrieval improvements and so on. More detailed changelog will be updated later.
|
||||
|
||||
- github.com/filecoin-project/lotus:
|
||||
- CARv2 v2.1.0
|
||||
- dagstore pieceReader: Fix wrong ErrUnexpectedEOF return in ReadAt
|
||||
- dagstore pieceReader: Always read full in ReadAt
|
||||
- Add metrics to dagstore piecereader
|
||||
- disable building of appimage on release
|
||||
- stores: Reduce log spam during retrievals
|
||||
- fix lint
|
||||
- Fix mock ReadPiece
|
||||
- fr32: Reduce MTTresh from 32M to 512k per core
|
||||
- piecereader: Avoid allocating 1024MB slices per read
|
||||
- piecereader: Avoid redundant roundtrips when seeking
|
||||
- piecereader: Move closer to storage
|
||||
- build: release: v1.13.2-rc3 ([filecoin-project/lotus#7752](https://github.com/filecoin-project/lotus/pull/7752))
|
||||
- build: release: v1.13.2-rc2 ([filecoin-project/lotus#7745](https://github.com/filecoin-project/lotus/pull/7745))
|
||||
- v1.13.2-rc1 ([filecoin-project/lotus#7718](https://github.com/filecoin-project/lotus/pull/7718))
|
||||
- Address Scheduler enhancements (#7703) review ([filecoin-project/lotus#7714](https://github.com/filecoin-project/lotus/pull/7714))
|
||||
- Scheduler enhancements ([filecoin-project/lotus#7703](https://github.com/filecoin-project/lotus/pull/7703))
|
||||
@ -64,7 +67,6 @@ is a highly recommended release with sealing pipeline fixes, worker management a
|
||||
- Update execution image to maybe update debian version
|
||||
|
||||
|
||||
|
||||
# v1.13.1 / 2021-11-26
|
||||
|
||||
This is an optional Lotus v1.13.1 release.
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -37,7 +37,7 @@ func BuildTypeString() string {
|
||||
}
|
||||
|
||||
// BuildVersion is the local build version
|
||||
const BuildVersion = "1.13.2-rc3"
|
||||
const BuildVersion = "1.13.2-rc4"
|
||||
|
||||
func UserVersion() string {
|
||||
if os.Getenv("LOTUS_VERSION_IGNORE_COMMIT") == "1" {
|
||||
|
@ -7,7 +7,7 @@ USAGE:
|
||||
lotus-miner [global options] command [command options] [arguments...]
|
||||
|
||||
VERSION:
|
||||
1.13.2-rc3
|
||||
1.13.2-rc4
|
||||
|
||||
COMMANDS:
|
||||
init Initialize a lotus miner repo
|
||||
|
@ -7,7 +7,7 @@ USAGE:
|
||||
lotus-worker [global options] command [command options] [arguments...]
|
||||
|
||||
VERSION:
|
||||
1.13.2-rc3
|
||||
1.13.2-rc4
|
||||
|
||||
COMMANDS:
|
||||
run Start lotus worker
|
||||
|
@ -7,7 +7,7 @@ USAGE:
|
||||
lotus [global options] command [command options] [arguments...]
|
||||
|
||||
VERSION:
|
||||
1.13.2-rc3
|
||||
1.13.2-rc4
|
||||
|
||||
COMMANDS:
|
||||
daemon Start a lotus daemon process
|
||||
|
2
extern/sector-storage/fr32/fr32.go
vendored
2
extern/sector-storage/fr32/fr32.go
vendored
@ -8,7 +8,7 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
)
|
||||
|
||||
var MTTresh = uint64(32 << 20)
|
||||
var MTTresh = uint64(512 << 10)
|
||||
|
||||
func mtChunkCount(usz abi.PaddedPieceSize) uint64 {
|
||||
threads := (uint64(usz)) / MTTresh
|
||||
|
12
extern/sector-storage/fr32/readers.go
vendored
12
extern/sector-storage/fr32/readers.go
vendored
@ -16,13 +16,21 @@ type unpadReader struct {
|
||||
work []byte
|
||||
}
|
||||
|
||||
func BufSize(sz abi.PaddedPieceSize) int {
|
||||
return int(MTTresh * mtChunkCount(sz))
|
||||
}
|
||||
|
||||
func NewUnpadReader(src io.Reader, sz abi.PaddedPieceSize) (io.Reader, error) {
|
||||
buf := make([]byte, BufSize(sz))
|
||||
|
||||
return NewUnpadReaderBuf(src, sz, buf)
|
||||
}
|
||||
|
||||
func NewUnpadReaderBuf(src io.Reader, sz abi.PaddedPieceSize, buf []byte) (io.Reader, error) {
|
||||
if err := sz.Validate(); err != nil {
|
||||
return nil, xerrors.Errorf("bad piece size: %w", err)
|
||||
}
|
||||
|
||||
buf := make([]byte, MTTresh*mtChunkCount(sz))
|
||||
|
||||
return &unpadReader{
|
||||
src: src,
|
||||
|
||||
|
15
extern/sector-storage/mock/mock.go
vendored
15
extern/sector-storage/mock/mock.go
vendored
@ -12,6 +12,7 @@ import (
|
||||
|
||||
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
|
||||
|
||||
"github.com/filecoin-project/dagstore/mount"
|
||||
ffiwrapper2 "github.com/filecoin-project/go-commp-utils/ffiwrapper"
|
||||
commcid "github.com/filecoin-project/go-fil-commcid"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
@ -384,12 +385,22 @@ func generateFakePoSt(sectorInfo []proof5.SectorInfo, rpt func(abi.RegisteredSea
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
|
||||
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) {
|
||||
if uint64(offset) != 0 {
|
||||
panic("implme")
|
||||
}
|
||||
|
||||
return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][startOffset:size])), false, nil
|
||||
br := bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])
|
||||
|
||||
return struct {
|
||||
io.ReadCloser
|
||||
io.Seeker
|
||||
io.ReaderAt
|
||||
}{
|
||||
ReadCloser: ioutil.NopCloser(br),
|
||||
Seeker: br,
|
||||
ReaderAt: br,
|
||||
}, false, nil
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) {
|
||||
|
112
extern/sector-storage/piece_provider.go
vendored
112
extern/sector-storage/piece_provider.go
vendored
@ -8,6 +8,7 @@ import (
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/dagstore/mount"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
@ -27,7 +28,7 @@ type PieceProvider interface {
|
||||
// default in most cases, but this might matter with future PoRep)
|
||||
// startOffset is added to the pieceOffset to get the starting reader offset.
|
||||
// The number of bytes that can be read is pieceSize-startOffset
|
||||
ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
|
||||
ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error)
|
||||
IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
|
||||
}
|
||||
|
||||
@ -71,37 +72,91 @@ func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storage.SectorRef
|
||||
// It will NOT try to schedule an Unseal of a sealed sector file for the read.
|
||||
//
|
||||
// Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers.
|
||||
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, pieceOffset, startOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) {
|
||||
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (mount.Reader, error) {
|
||||
// acquire a lock purely for reading unsealed sectors
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
|
||||
cancel()
|
||||
return nil, nil, xerrors.Errorf("acquiring read sector lock: %w", err)
|
||||
return nil, xerrors.Errorf("acquiring read sector lock: %w", err)
|
||||
}
|
||||
|
||||
// Reader returns a reader for an unsealed piece at the given offset in the given sector.
|
||||
// Reader returns a reader getter for an unsealed piece at the given offset in the given sector.
|
||||
// The returned reader will be nil if none of the workers has an unsealed sector file containing
|
||||
// the unsealed piece.
|
||||
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()+startOffset.Padded()), size.Padded()-abi.PaddedPieceSize(startOffset.Padded()))
|
||||
rg, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()), size.Padded())
|
||||
if err != nil {
|
||||
cancel()
|
||||
log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err)
|
||||
cancel()
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
if r == nil {
|
||||
if rg == nil {
|
||||
cancel()
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return r, cancel, nil
|
||||
buf := make([]byte, fr32.BufSize(size.Padded()))
|
||||
|
||||
pr, err := (&pieceReader{
|
||||
ctx: ctx,
|
||||
getReader: func(ctx context.Context, startOffset uint64) (io.ReadCloser, error) {
|
||||
startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127
|
||||
|
||||
r, err := rg(startOffsetAligned.Padded())
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getting reader at +%d: %w", startOffsetAligned, err)
|
||||
}
|
||||
|
||||
upr, err := fr32.NewUnpadReaderBuf(r, size.Padded(), buf)
|
||||
if err != nil {
|
||||
r.Close() // nolint
|
||||
return nil, xerrors.Errorf("creating unpadded reader: %w", err)
|
||||
}
|
||||
|
||||
bir := bufio.NewReaderSize(upr, 127)
|
||||
if startOffset > uint64(startOffsetAligned) {
|
||||
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
|
||||
r.Close() // nolint
|
||||
return nil, xerrors.Errorf("discarding bytes for startOffset: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return struct {
|
||||
io.Reader
|
||||
io.Closer
|
||||
}{
|
||||
Reader: bir,
|
||||
Closer: funcCloser(func() error {
|
||||
return r.Close()
|
||||
}),
|
||||
}, nil
|
||||
},
|
||||
len: size,
|
||||
onClose: cancel,
|
||||
pieceCid: pc,
|
||||
}).init()
|
||||
if err != nil || pr == nil { // pr == nil to make sure we don't return typed nil
|
||||
cancel()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return pr, err
|
||||
}
|
||||
|
||||
type funcCloser func() error
|
||||
|
||||
func (f funcCloser) Close() error {
|
||||
return f()
|
||||
}
|
||||
|
||||
var _ io.Closer = funcCloser(nil)
|
||||
|
||||
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
|
||||
// If an Unsealed sector file exists with the Piece Unsealed in it, we'll use that for the read.
|
||||
// Otherwise, we will Unseal a Sealed sector file for the given sector and read the Unsealed piece from it.
|
||||
// If we do NOT have an existing unsealed file containing the given piece thus causing us to schedule an Unseal,
|
||||
// the returned boolean parameter will be set to true.
|
||||
// If we have an existing unsealed file containing the given piece, the returned boolean will be set to false.
|
||||
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
|
||||
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) {
|
||||
if err := pieceOffset.Valid(); err != nil {
|
||||
return nil, false, xerrors.Errorf("pieceOffset is not valid: %w", err)
|
||||
}
|
||||
@ -109,9 +164,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
|
||||
return nil, false, xerrors.Errorf("size is not a valid piece size: %w", err)
|
||||
}
|
||||
|
||||
startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127
|
||||
|
||||
r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)
|
||||
r, err := p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size)
|
||||
|
||||
log.Debugf("result of first tryReadUnsealedPiece: r=%+v, err=%s", r, err)
|
||||
|
||||
@ -142,7 +195,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
|
||||
|
||||
log.Debugf("unsealed a sector file to read the piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
|
||||
|
||||
r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)
|
||||
r, err = p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size)
|
||||
if err != nil {
|
||||
log.Errorf("failed to tryReadUnsealedPiece after SectorsUnsealPiece: %s", err)
|
||||
return nil, true, xerrors.Errorf("read after unsealing: %w", err)
|
||||
@ -156,34 +209,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
|
||||
log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
|
||||
}
|
||||
|
||||
upr, err := fr32.NewUnpadReader(r, size.Padded())
|
||||
if err != nil {
|
||||
unlock()
|
||||
return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err)
|
||||
}
|
||||
log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
|
||||
|
||||
log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, startOffset=%d, size=%d", sector, pieceOffset, startOffset, size)
|
||||
|
||||
bir := bufio.NewReaderSize(upr, 127)
|
||||
if startOffset > uint64(startOffsetAligned) {
|
||||
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
|
||||
return nil, false, xerrors.Errorf("discarding bytes for startOffset: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return &funcCloser{
|
||||
Reader: bir,
|
||||
close: func() error {
|
||||
err = r.Close()
|
||||
unlock()
|
||||
return err
|
||||
},
|
||||
}, uns, nil
|
||||
return r, uns, nil
|
||||
}
|
||||
|
||||
type funcCloser struct {
|
||||
io.Reader
|
||||
close func() error
|
||||
}
|
||||
|
||||
func (fc *funcCloser) Close() error { return fc.close() }
|
||||
|
2
extern/sector-storage/piece_provider_test.go
vendored
2
extern/sector-storage/piece_provider_test.go
vendored
@ -338,7 +338,7 @@ func (p *pieceProviderTestHarness) isUnsealed(t *testing.T, offset storiface.Unp
|
||||
|
||||
func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize,
|
||||
expectedHadToUnseal bool, expectedBytes []byte) {
|
||||
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, 0, size, p.ticket, p.commD)
|
||||
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, rd)
|
||||
require.Equal(t, expectedHadToUnseal, isUnsealed)
|
||||
|
@ -1,4 +1,4 @@
|
||||
package dagstore
|
||||
package sectorstorage
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
@ -19,11 +19,14 @@ import (
|
||||
var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M
|
||||
var ReadBuf = 128 * (127 * 8) // unpadded(128k)
|
||||
|
||||
type pieceGetter func(ctx context.Context, offset uint64) (io.ReadCloser, error)
|
||||
|
||||
type pieceReader struct {
|
||||
ctx context.Context
|
||||
api MinerAPI
|
||||
getReader pieceGetter
|
||||
pieceCid cid.Cid
|
||||
len abi.UnpaddedPieceSize
|
||||
onClose context.CancelFunc
|
||||
|
||||
closed bool
|
||||
seqAt int64 // next byte to be read by io.Reader
|
||||
@ -37,10 +40,14 @@ func (p *pieceReader) init() (_ *pieceReader, err error) {
|
||||
stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1))
|
||||
|
||||
p.rAt = 0
|
||||
p.r, p.len, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
|
||||
p.r, err = p.getReader(p.ctx, uint64(p.rAt))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if p.r == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
p.br = bufio.NewReaderSize(p.r, ReadBuf)
|
||||
|
||||
return p, nil
|
||||
@ -60,12 +67,19 @@ func (p *pieceReader) Close() error {
|
||||
}
|
||||
|
||||
if p.r != nil {
|
||||
if err := p.r.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := p.r.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
p.r = nil
|
||||
}
|
||||
|
||||
p.onClose()
|
||||
|
||||
p.closed = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -127,7 +141,7 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
|
||||
}
|
||||
|
||||
p.rAt = off
|
||||
p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
|
||||
p.r, err = p.getReader(p.ctx, uint64(p.rAt))
|
||||
p.br = bufio.NewReaderSize(p.r, ReadBuf)
|
||||
if err != nil {
|
||||
return 0, xerrors.Errorf("getting backing reader: %w", err)
|
1
extern/sector-storage/stores/http_handler.go
vendored
1
extern/sector-storage/stores/http_handler.go
vendored
@ -84,7 +84,6 @@ func (handler *FetchHandler) remoteStatFs(w http.ResponseWriter, r *http.Request
|
||||
// remoteGetSector returns the sector file/tared directory byte stream for the sectorID and sector file type sent in the request.
|
||||
// returns an error if it does NOT have the required sector file/dir.
|
||||
func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Request) {
|
||||
log.Infof("SERVE GET %s", r.URL)
|
||||
vars := mux.Vars(r)
|
||||
|
||||
id, err := storiface.ParseSectorID(vars["id"])
|
||||
|
69
extern/sector-storage/stores/remote.go
vendored
69
extern/sector-storage/stores/remote.go
vendored
@ -585,7 +585,7 @@ func (r *Remote) CheckIsUnsealed(ctx context.Context, s storage.SectorRef, offse
|
||||
// 1. no worker(local worker included) has an unsealed file for the given sector OR
|
||||
// 2. no worker(local worker included) has the unsealed piece in their unsealed sector file.
|
||||
// Will return a nil reader and a nil error in such a case.
|
||||
func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) {
|
||||
func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error), error) {
|
||||
ft := storiface.FTUnsealed
|
||||
|
||||
// check if we have the unsealed sector file locally
|
||||
@ -623,7 +623,52 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
|
||||
|
||||
if has {
|
||||
log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size)
|
||||
return r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset), size)
|
||||
|
||||
return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
|
||||
// don't reuse between readers unless closed
|
||||
f := pf
|
||||
pf = nil
|
||||
|
||||
if f == nil {
|
||||
f, err = r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("opening partial file: %w", err)
|
||||
}
|
||||
log.Debugf("local partial file (re)opened %s (+%d,%d)", path, offset, size)
|
||||
}
|
||||
|
||||
r, err := r.pfHandler.Reader(f, storiface.PaddedByteIndex(offset)+startOffsetAligned, size-abi.PaddedPieceSize(startOffsetAligned))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return struct {
|
||||
io.Reader
|
||||
io.Closer
|
||||
}{
|
||||
Reader: r,
|
||||
Closer: funcCloser(func() error {
|
||||
// if we already have a reader cached, close this one
|
||||
if pf != nil {
|
||||
if f == nil {
|
||||
return nil
|
||||
}
|
||||
if pf == f {
|
||||
pf = nil
|
||||
}
|
||||
|
||||
tmp := f
|
||||
f = nil
|
||||
return tmp.Close()
|
||||
}
|
||||
|
||||
// otherwise stash it away for reuse
|
||||
pf = f
|
||||
return nil
|
||||
}),
|
||||
}, nil
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
log.Debugf("miner has unsealed file but not unseal piece, %s (+%d,%d)", path, offset, size)
|
||||
@ -666,16 +711,18 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
|
||||
continue
|
||||
}
|
||||
|
||||
return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
|
||||
// readRemote fetches a reader that we can use to read the unsealed piece from the remote worker.
|
||||
// It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file.
|
||||
rd, err := r.readRemote(ctx, url, offset, size)
|
||||
rd, err := r.readRemote(ctx, url, offset+abi.PaddedPieceSize(startOffsetAligned), size)
|
||||
if err != nil {
|
||||
log.Warnw("reading from remote", "url", url, "error", err)
|
||||
lastErr = err
|
||||
continue
|
||||
return nil, err
|
||||
}
|
||||
log.Infof("Read remote %s (+%d,%d)", url, offset, size)
|
||||
return rd, nil
|
||||
|
||||
return rd, err
|
||||
}, nil
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -692,3 +739,11 @@ func (r *Remote) Reserve(ctx context.Context, sid storage.SectorRef, ft storifac
|
||||
}
|
||||
|
||||
var _ Store = &Remote{}
|
||||
|
||||
type funcCloser func() error
|
||||
|
||||
func (f funcCloser) Close() error {
|
||||
return f()
|
||||
}
|
||||
|
||||
var _ io.Closer = funcCloser(nil)
|
||||
|
16
extern/sector-storage/stores/remote_test.go
vendored
16
extern/sector-storage/stores/remote_test.go
vendored
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
@ -470,12 +471,20 @@ func TestReader(t *testing.T) {
|
||||
|
||||
remoteStore := stores.NewRemote(lstore, index, nil, 6000, pfhandler)
|
||||
|
||||
rd, err := remoteStore.Reader(ctx, sectorRef, offset, size)
|
||||
rdg, err := remoteStore.Reader(ctx, sectorRef, offset, size)
|
||||
var rd io.ReadCloser
|
||||
|
||||
if tc.errStr != "" {
|
||||
if rdg == nil {
|
||||
require.Error(t, err)
|
||||
require.Nil(t, rdg)
|
||||
require.Contains(t, err.Error(), tc.errStr)
|
||||
} else {
|
||||
rd, err = rdg(0)
|
||||
require.Error(t, err)
|
||||
require.Nil(t, rd)
|
||||
require.Contains(t, err.Error(), tc.errStr)
|
||||
}
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
@ -483,7 +492,10 @@ func TestReader(t *testing.T) {
|
||||
if !tc.expectedNonNilReader {
|
||||
require.Nil(t, rd)
|
||||
} else {
|
||||
require.NotNil(t, rd)
|
||||
require.NotNil(t, rdg)
|
||||
rd, err := rdg(0)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
require.NoError(t, rd.Close())
|
||||
}()
|
||||
|
@ -3,22 +3,22 @@ package dagstore
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/filecoin-project/dagstore/throttle"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/dagstore/mount"
|
||||
"github.com/filecoin-project/dagstore/throttle"
|
||||
"github.com/filecoin-project/go-fil-markets/piecestore"
|
||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||
"github.com/filecoin-project/go-fil-markets/shared"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
)
|
||||
|
||||
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_lotus_accessor.go -package=mock_dagstore . MinerAPI
|
||||
|
||||
type MinerAPI interface {
|
||||
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error)
|
||||
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error)
|
||||
GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error)
|
||||
IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error)
|
||||
Start(ctx context.Context) error
|
||||
@ -27,7 +27,7 @@ type MinerAPI interface {
|
||||
type SectorAccessor interface {
|
||||
retrievalmarket.SectorAccessor
|
||||
|
||||
UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error)
|
||||
UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error)
|
||||
}
|
||||
|
||||
type minerAPI struct {
|
||||
@ -100,10 +100,10 @@ func (m *minerAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, erro
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) {
|
||||
func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error) {
|
||||
err := m.readyMgr.AwaitReady()
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Throttle this path to avoid flooding the storage subsystem.
|
||||
@ -114,11 +114,11 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
|
||||
return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
|
||||
}
|
||||
|
||||
if len(pieceInfo.Deals) == 0 {
|
||||
return nil, 0, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
|
||||
return nil, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
|
||||
}
|
||||
|
||||
// prefer an unsealed sector containing the piece if one exists
|
||||
@ -126,7 +126,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
|
||||
deal := deal
|
||||
|
||||
// Throttle this path to avoid flooding the storage subsystem.
|
||||
var reader io.ReadCloser
|
||||
var reader mount.Reader
|
||||
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
|
||||
isUnsealed, err := m.sa.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
|
||||
if err != nil {
|
||||
@ -136,7 +136,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
|
||||
return nil
|
||||
}
|
||||
// Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing.
|
||||
reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded())
|
||||
reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
|
||||
return err
|
||||
})
|
||||
|
||||
@ -147,7 +147,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
|
||||
|
||||
if reader != nil {
|
||||
// we were able to obtain a reader for an already unsealed piece
|
||||
return reader, deal.Length.Unpadded(), nil
|
||||
return reader, nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -158,7 +158,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
|
||||
// block for a long time with the current PoRep
|
||||
//
|
||||
// This path is unthrottled.
|
||||
reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded())
|
||||
reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
|
||||
if err != nil {
|
||||
lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err)
|
||||
log.Warn(lastErr.Error())
|
||||
@ -166,10 +166,10 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
|
||||
}
|
||||
|
||||
// Successfully fetched the deal data so return a reader over the data
|
||||
return reader, deal.Length.Unpadded(), nil
|
||||
return reader, nil
|
||||
}
|
||||
|
||||
return nil, 0, lastErr
|
||||
return nil, lastErr
|
||||
}
|
||||
|
||||
func (m *minerAPI) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/filecoin-project/dagstore/mount"
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
|
||||
@ -87,7 +88,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
|
||||
}
|
||||
|
||||
// Fetch the piece
|
||||
r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0)
|
||||
r, err := api.FetchUnsealedPiece(ctx, cid1)
|
||||
if tc.expectErr {
|
||||
require.Error(t, err)
|
||||
return
|
||||
@ -159,7 +160,7 @@ func TestThrottle(t *testing.T) {
|
||||
errgrp, ctx := errgroup.WithContext(context.Background())
|
||||
for i := 0; i < 10; i++ {
|
||||
errgrp.Go(func() error {
|
||||
r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0)
|
||||
r, err := api.FetchUnsealedPiece(ctx, cid1)
|
||||
if err == nil {
|
||||
_ = r.Close()
|
||||
}
|
||||
@ -203,10 +204,10 @@ type mockRPN struct {
|
||||
}
|
||||
|
||||
func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
|
||||
return m.UnsealSectorAt(ctx, sectorID, offset, 0, length)
|
||||
return m.UnsealSectorAt(ctx, sectorID, offset, length)
|
||||
}
|
||||
|
||||
func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
|
||||
func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) {
|
||||
atomic.AddInt32(&m.calls, 1)
|
||||
m.lk.RLock()
|
||||
defer m.lk.RUnlock()
|
||||
@ -215,7 +216,13 @@ func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber,
|
||||
if !ok {
|
||||
panic("sector not found")
|
||||
}
|
||||
return io.NopCloser(bytes.NewBuffer([]byte(data[startOffset:]))), nil
|
||||
return struct {
|
||||
io.ReadCloser
|
||||
io.ReaderAt
|
||||
io.Seeker
|
||||
}{
|
||||
ReadCloser: io.NopCloser(bytes.NewBuffer([]byte(data[:]))),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *mockRPN) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {
|
||||
|
@ -6,10 +6,9 @@ package mock_dagstore
|
||||
|
||||
import (
|
||||
context "context"
|
||||
io "io"
|
||||
reflect "reflect"
|
||||
|
||||
abi "github.com/filecoin-project/go-state-types/abi"
|
||||
mount "github.com/filecoin-project/dagstore/mount"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
)
|
||||
@ -38,19 +37,18 @@ func (m *MockMinerAPI) EXPECT() *MockMinerAPIMockRecorder {
|
||||
}
|
||||
|
||||
// FetchUnsealedPiece mocks base method.
|
||||
func (m *MockMinerAPI) FetchUnsealedPiece(arg0 context.Context, arg1 cid.Cid, arg2 uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) {
|
||||
func (m *MockMinerAPI) FetchUnsealedPiece(arg0 context.Context, arg1 cid.Cid) (mount.Reader, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "FetchUnsealedPiece", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].(io.ReadCloser)
|
||||
ret1, _ := ret[1].(abi.UnpaddedPieceSize)
|
||||
ret2, _ := ret[2].(error)
|
||||
return ret0, ret1, ret2
|
||||
ret := m.ctrl.Call(m, "FetchUnsealedPiece", arg0, arg1)
|
||||
ret0, _ := ret[0].(mount.Reader)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// FetchUnsealedPiece indicates an expected call of FetchUnsealedPiece.
|
||||
func (mr *MockMinerAPIMockRecorder) FetchUnsealedPiece(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
func (mr *MockMinerAPIMockRecorder) FetchUnsealedPiece(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockMinerAPI)(nil).FetchUnsealedPiece), arg0, arg1, arg2)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockMinerAPI)(nil).FetchUnsealedPiece), arg0, arg1)
|
||||
}
|
||||
|
||||
// GetUnpaddedCARSize mocks base method.
|
||||
|
@ -56,11 +56,7 @@ func (l *LotusMount) Deserialize(u *url.URL) error {
|
||||
}
|
||||
|
||||
func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) {
|
||||
return (&pieceReader{
|
||||
ctx: ctx,
|
||||
api: l.API,
|
||||
pieceCid: l.PieceCid,
|
||||
}).init()
|
||||
return l.API.FetchUnsealedPiece(ctx, l.PieceCid)
|
||||
}
|
||||
|
||||
func (l *LotusMount) Info() mount.Info {
|
||||
|
@ -2,6 +2,7 @@ package dagstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/url"
|
||||
"strings"
|
||||
@ -12,8 +13,6 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/dagstore/mount"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
mock_dagstore "github.com/filecoin-project/lotus/markets/dagstore/mocks"
|
||||
)
|
||||
|
||||
@ -30,8 +29,28 @@ func TestLotusMount(t *testing.T) {
|
||||
mockLotusMountAPI := mock_dagstore.NewMockMinerAPI(mockCtrl)
|
||||
|
||||
mockLotusMountAPI.EXPECT().IsUnsealed(gomock.Any(), cid).Return(true, nil).Times(1)
|
||||
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1)
|
||||
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1)
|
||||
|
||||
mr1 := struct {
|
||||
io.ReadCloser
|
||||
io.ReaderAt
|
||||
io.Seeker
|
||||
}{
|
||||
ReadCloser: ioutil.NopCloser(strings.NewReader("testing")),
|
||||
ReaderAt: nil,
|
||||
Seeker: nil,
|
||||
}
|
||||
mr2 := struct {
|
||||
io.ReadCloser
|
||||
io.ReaderAt
|
||||
io.Seeker
|
||||
}{
|
||||
ReadCloser: ioutil.NopCloser(strings.NewReader("testing")),
|
||||
ReaderAt: nil,
|
||||
Seeker: nil,
|
||||
}
|
||||
|
||||
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(mr1, nil).Times(1)
|
||||
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(mr2, nil).Times(1)
|
||||
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1)
|
||||
|
||||
mnt, err := NewLotusMount(cid, mockLotusMountAPI)
|
||||
|
@ -6,9 +6,9 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/dagstore"
|
||||
"github.com/filecoin-project/dagstore/mount"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||
@ -127,17 +127,20 @@ type wrappedSA struct {
|
||||
retrievalmarket.SectorAccessor
|
||||
}
|
||||
|
||||
func (w *wrappedSA) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
|
||||
func (w *wrappedSA) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) {
|
||||
r, err := w.UnsealSector(ctx, sectorID, pieceOffset, length)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if startOffset > 0 {
|
||||
if _, err := io.CopyN(io.Discard, r, int64(startOffset)); err != nil {
|
||||
return nil, xerrors.Errorf("discard start off: %w", err)
|
||||
}
|
||||
}
|
||||
return r, err
|
||||
return struct {
|
||||
io.ReadCloser
|
||||
io.Seeker
|
||||
io.ReaderAt
|
||||
}{
|
||||
ReadCloser: r,
|
||||
Seeker: nil,
|
||||
ReaderAt: nil,
|
||||
}, err
|
||||
}
|
||||
|
||||
var _ SectorAccessor = &wrappedSA{}
|
||||
|
@ -3,7 +3,6 @@ package dagstore
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
@ -12,8 +11,6 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/dagstore"
|
||||
"github.com/filecoin-project/dagstore/mount"
|
||||
"github.com/filecoin-project/dagstore/shard"
|
||||
@ -192,7 +189,7 @@ func (m mockLotusMount) Start(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m mockLotusMount) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) {
|
||||
func (m mockLotusMount) FetchUnsealedPiece(context.Context, cid.Cid) (mount.Reader, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
|
@ -4,8 +4,16 @@ import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/dagstore/mount"
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
specstorage "github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/api/v1api"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
@ -14,14 +22,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/markets/dagstore"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
specstorage "github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
)
|
||||
|
||||
var log = logging.Logger("sectoraccessor")
|
||||
@ -40,10 +40,10 @@ func NewSectorAccessor(maddr dtypes.MinerAddress, secb sectorblocks.SectorBuilde
|
||||
}
|
||||
|
||||
func (sa *sectorAccessor) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
|
||||
return sa.UnsealSectorAt(ctx, sectorID, pieceOffset, 0, length)
|
||||
return sa.UnsealSectorAt(ctx, sectorID, pieceOffset, length)
|
||||
}
|
||||
|
||||
func (sa *sectorAccessor) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
|
||||
func (sa *sectorAccessor) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) {
|
||||
log.Debugf("get sector %d, pieceOffset %d, length %d", sectorID, pieceOffset, length)
|
||||
si, err := sa.sectorsStatus(ctx, sectorID, false)
|
||||
if err != nil {
|
||||
@ -69,8 +69,8 @@ func (sa *sectorAccessor) UnsealSectorAt(ctx context.Context, sectorID abi.Secto
|
||||
}
|
||||
|
||||
// Get a reader for the piece, unsealing the piece if necessary
|
||||
log.Debugf("read piece in sector %d, pieceOffset %d, startOffset %d, length %d from miner %d", sectorID, pieceOffset, startOffset, length, mid)
|
||||
r, unsealed, err := sa.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(pieceOffset), startOffset, length, si.Ticket.Value, commD)
|
||||
log.Debugf("read piece in sector %d, pieceOffset %d, length %d from miner %d", sectorID, pieceOffset, length, mid)
|
||||
r, unsealed, err := sa.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(pieceOffset), length, si.Ticket.Value, commD)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to unseal piece from sector %d: %w", sectorID, err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user