Merge branch 'master' into deps/update-ctx-dsbs

This commit is contained in:
Aayush Rajasekaran 2021-12-14 18:08:05 -05:00
commit 0e2278cc76
23 changed files with 317 additions and 153 deletions

View File

@ -5,6 +5,7 @@ import (
"context"
"encoding/binary"
"fmt"
"os"
gruntime "runtime"
"time"
@ -52,7 +53,7 @@ func (m *Message) ValueReceived() abi.TokenAmount {
}
// EnableGasTracing, if true, outputs gas tracing in execution traces.
var EnableGasTracing = false
var EnableGasTracing = os.Getenv("LOTUS_VM_ENABLE_GAS_TRACING_VERY_SLOW") == "1"
type Runtime struct {
rt5.Message

View File

@ -3,6 +3,7 @@ package main
import (
"fmt"
"os"
"sort"
"text/tabwriter"
"github.com/docker/go-units"
@ -137,6 +138,10 @@ var retrievalDealsListCmd = &cli.Command{
return err
}
sort.Slice(deals, func(i, j int) bool {
return deals[i].ID < deals[j].ID
})
w := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0)
_, _ = fmt.Fprintf(w, "Receiver\tDealID\tPayload\tState\tPricePerByte\tBytesSent\tMessage\n")

View File

@ -8,7 +8,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
)
var MTTresh = uint64(32 << 20)
var MTTresh = uint64(512 << 10)
func mtChunkCount(usz abi.PaddedPieceSize) uint64 {
threads := (uint64(usz)) / MTTresh

View File

@ -16,13 +16,21 @@ type unpadReader struct {
work []byte
}
func BufSize(sz abi.PaddedPieceSize) int {
return int(MTTresh * mtChunkCount(sz))
}
func NewUnpadReader(src io.Reader, sz abi.PaddedPieceSize) (io.Reader, error) {
buf := make([]byte, BufSize(sz))
return NewUnpadReaderBuf(src, sz, buf)
}
func NewUnpadReaderBuf(src io.Reader, sz abi.PaddedPieceSize, buf []byte) (io.Reader, error) {
if err := sz.Validate(); err != nil {
return nil, xerrors.Errorf("bad piece size: %w", err)
}
buf := make([]byte, MTTresh*mtChunkCount(sz))
return &unpadReader{
src: src,

View File

@ -12,6 +12,7 @@ import (
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/filecoin-project/dagstore/mount"
ffiwrapper2 "github.com/filecoin-project/go-commp-utils/ffiwrapper"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/go-state-types/abi"
@ -384,12 +385,22 @@ func generateFakePoSt(sectorInfo []proof5.SectorInfo, rpt func(abi.RegisteredSea
}
}
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) {
if uint64(offset) != 0 {
panic("implme")
}
return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][startOffset:size])), false, nil
br := bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])
return struct {
io.ReadCloser
io.Seeker
io.ReaderAt
}{
ReadCloser: ioutil.NopCloser(br),
Seeker: br,
ReaderAt: br,
}, false, nil
}
func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) {

View File

@ -8,6 +8,7 @@ import (
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
@ -27,7 +28,7 @@ type PieceProvider interface {
// default in most cases, but this might matter with future PoRep)
// startOffset is added to the pieceOffset to get the starting reader offset.
// The number of bytes that can be read is pieceSize-startOffset
ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error)
IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
}
@ -71,37 +72,91 @@ func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storage.SectorRef
// It will NOT try to schedule an Unseal of a sealed sector file for the read.
//
// Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers.
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, pieceOffset, startOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) {
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (mount.Reader, error) {
// acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx)
if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
cancel()
return nil, nil, xerrors.Errorf("acquiring read sector lock: %w", err)
return nil, xerrors.Errorf("acquiring read sector lock: %w", err)
}
// Reader returns a reader for an unsealed piece at the given offset in the given sector.
// Reader returns a reader getter for an unsealed piece at the given offset in the given sector.
// The returned reader will be nil if none of the workers has an unsealed sector file containing
// the unsealed piece.
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()+startOffset.Padded()), size.Padded()-abi.PaddedPieceSize(startOffset.Padded()))
rg, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()), size.Padded())
if err != nil {
cancel()
log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err)
cancel()
return nil, nil, err
return nil, err
}
if r == nil {
if rg == nil {
cancel()
return nil, nil
}
return r, cancel, nil
buf := make([]byte, fr32.BufSize(size.Padded()))
pr, err := (&pieceReader{
ctx: ctx,
getReader: func(ctx context.Context, startOffset uint64) (io.ReadCloser, error) {
startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127
r, err := rg(startOffsetAligned.Padded())
if err != nil {
return nil, xerrors.Errorf("getting reader at +%d: %w", startOffsetAligned, err)
}
upr, err := fr32.NewUnpadReaderBuf(r, size.Padded(), buf)
if err != nil {
r.Close() // nolint
return nil, xerrors.Errorf("creating unpadded reader: %w", err)
}
bir := bufio.NewReaderSize(upr, 127)
if startOffset > uint64(startOffsetAligned) {
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
r.Close() // nolint
return nil, xerrors.Errorf("discarding bytes for startOffset: %w", err)
}
}
return struct {
io.Reader
io.Closer
}{
Reader: bir,
Closer: funcCloser(func() error {
return r.Close()
}),
}, nil
},
len: size,
onClose: cancel,
pieceCid: pc,
}).init()
if err != nil || pr == nil { // pr == nil to make sure we don't return typed nil
cancel()
return nil, err
}
return pr, err
}
type funcCloser func() error
func (f funcCloser) Close() error {
return f()
}
var _ io.Closer = funcCloser(nil)
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
// If an Unsealed sector file exists with the Piece Unsealed in it, we'll use that for the read.
// Otherwise, we will Unseal a Sealed sector file for the given sector and read the Unsealed piece from it.
// If we do NOT have an existing unsealed file containing the given piece thus causing us to schedule an Unseal,
// the returned boolean parameter will be set to true.
// If we have an existing unsealed file containing the given piece, the returned boolean will be set to false.
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) {
if err := pieceOffset.Valid(); err != nil {
return nil, false, xerrors.Errorf("pieceOffset is not valid: %w", err)
}
@ -109,9 +164,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
return nil, false, xerrors.Errorf("size is not a valid piece size: %w", err)
}
startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127
r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)
r, err := p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size)
log.Debugf("result of first tryReadUnsealedPiece: r=%+v, err=%s", r, err)
@ -142,7 +195,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
log.Debugf("unsealed a sector file to read the piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)
r, err = p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size)
if err != nil {
log.Errorf("failed to tryReadUnsealedPiece after SectorsUnsealPiece: %s", err)
return nil, true, xerrors.Errorf("read after unsealing: %w", err)
@ -156,34 +209,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
}
upr, err := fr32.NewUnpadReader(r, size.Padded())
if err != nil {
unlock()
return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err)
}
log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, startOffset=%d, size=%d", sector, pieceOffset, startOffset, size)
bir := bufio.NewReaderSize(upr, 127)
if startOffset > uint64(startOffsetAligned) {
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
return nil, false, xerrors.Errorf("discarding bytes for startOffset: %w", err)
}
}
return &funcCloser{
Reader: bir,
close: func() error {
err = r.Close()
unlock()
return err
},
}, uns, nil
return r, uns, nil
}
type funcCloser struct {
io.Reader
close func() error
}
func (fc *funcCloser) Close() error { return fc.close() }

View File

@ -338,7 +338,7 @@ func (p *pieceProviderTestHarness) isUnsealed(t *testing.T, offset storiface.Unp
func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize,
expectedHadToUnseal bool, expectedBytes []byte) {
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, 0, size, p.ticket, p.commD)
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD)
require.NoError(t, err)
require.NotNil(t, rd)
require.Equal(t, expectedHadToUnseal, isUnsealed)

View File

@ -1,4 +1,4 @@
package dagstore
package sectorstorage
import (
"bufio"
@ -19,11 +19,14 @@ import (
var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M
var ReadBuf = 128 * (127 * 8) // unpadded(128k)
type pieceGetter func(ctx context.Context, offset uint64) (io.ReadCloser, error)
type pieceReader struct {
ctx context.Context
api MinerAPI
pieceCid cid.Cid
len abi.UnpaddedPieceSize
ctx context.Context
getReader pieceGetter
pieceCid cid.Cid
len abi.UnpaddedPieceSize
onClose context.CancelFunc
closed bool
seqAt int64 // next byte to be read by io.Reader
@ -37,10 +40,14 @@ func (p *pieceReader) init() (_ *pieceReader, err error) {
stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1))
p.rAt = 0
p.r, p.len, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
p.r, err = p.getReader(p.ctx, uint64(p.rAt))
if err != nil {
return nil, err
}
if p.r == nil {
return nil, nil
}
p.br = bufio.NewReaderSize(p.r, ReadBuf)
return p, nil
@ -60,12 +67,19 @@ func (p *pieceReader) Close() error {
}
if p.r != nil {
if err := p.r.Close(); err != nil {
return err
}
if err := p.r.Close(); err != nil {
return err
}
p.r = nil
}
p.onClose()
p.closed = true
return nil
}
@ -127,7 +141,7 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
}
p.rAt = off
p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
p.r, err = p.getReader(p.ctx, uint64(p.rAt))
p.br = bufio.NewReaderSize(p.r, ReadBuf)
if err != nil {
return 0, xerrors.Errorf("getting backing reader: %w", err)

View File

@ -84,7 +84,6 @@ func (handler *FetchHandler) remoteStatFs(w http.ResponseWriter, r *http.Request
// remoteGetSector returns the sector file/tared directory byte stream for the sectorID and sector file type sent in the request.
// returns an error if it does NOT have the required sector file/dir.
func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Request) {
log.Infof("SERVE GET %s", r.URL)
vars := mux.Vars(r)
id, err := storiface.ParseSectorID(vars["id"])

View File

@ -585,7 +585,7 @@ func (r *Remote) CheckIsUnsealed(ctx context.Context, s storage.SectorRef, offse
// 1. no worker(local worker included) has an unsealed file for the given sector OR
// 2. no worker(local worker included) has the unsealed piece in their unsealed sector file.
// Will return a nil reader and a nil error in such a case.
func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) {
func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error), error) {
ft := storiface.FTUnsealed
// check if we have the unsealed sector file locally
@ -623,7 +623,52 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
if has {
log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size)
return r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset), size)
return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
// don't reuse between readers unless closed
f := pf
pf = nil
if f == nil {
f, err = r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path)
if err != nil {
return nil, xerrors.Errorf("opening partial file: %w", err)
}
log.Debugf("local partial file (re)opened %s (+%d,%d)", path, offset, size)
}
r, err := r.pfHandler.Reader(f, storiface.PaddedByteIndex(offset)+startOffsetAligned, size-abi.PaddedPieceSize(startOffsetAligned))
if err != nil {
return nil, err
}
return struct {
io.Reader
io.Closer
}{
Reader: r,
Closer: funcCloser(func() error {
// if we already have a reader cached, close this one
if pf != nil {
if f == nil {
return nil
}
if pf == f {
pf = nil
}
tmp := f
f = nil
return tmp.Close()
}
// otherwise stash it away for reuse
pf = f
return nil
}),
}, nil
}, nil
}
log.Debugf("miner has unsealed file but not unseal piece, %s (+%d,%d)", path, offset, size)
@ -666,16 +711,18 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
continue
}
// readRemote fetches a reader that we can use to read the unsealed piece from the remote worker.
// It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file.
rd, err := r.readRemote(ctx, url, offset, size)
if err != nil {
log.Warnw("reading from remote", "url", url, "error", err)
lastErr = err
continue
}
log.Infof("Read remote %s (+%d,%d)", url, offset, size)
return rd, nil
return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) {
// readRemote fetches a reader that we can use to read the unsealed piece from the remote worker.
// It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file.
rd, err := r.readRemote(ctx, url, offset+abi.PaddedPieceSize(startOffsetAligned), size)
if err != nil {
log.Warnw("reading from remote", "url", url, "error", err)
return nil, err
}
return rd, err
}, nil
}
}
@ -692,3 +739,11 @@ func (r *Remote) Reserve(ctx context.Context, sid storage.SectorRef, ft storifac
}
var _ Store = &Remote{}
type funcCloser func() error
func (f funcCloser) Close() error {
return f()
}
var _ io.Closer = funcCloser(nil)

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
@ -470,12 +471,20 @@ func TestReader(t *testing.T) {
remoteStore := stores.NewRemote(lstore, index, nil, 6000, pfhandler)
rd, err := remoteStore.Reader(ctx, sectorRef, offset, size)
rdg, err := remoteStore.Reader(ctx, sectorRef, offset, size)
var rd io.ReadCloser
if tc.errStr != "" {
require.Error(t, err)
require.Nil(t, rd)
require.Contains(t, err.Error(), tc.errStr)
if rdg == nil {
require.Error(t, err)
require.Nil(t, rdg)
require.Contains(t, err.Error(), tc.errStr)
} else {
rd, err = rdg(0)
require.Error(t, err)
require.Nil(t, rd)
require.Contains(t, err.Error(), tc.errStr)
}
} else {
require.NoError(t, err)
}
@ -483,7 +492,10 @@ func TestReader(t *testing.T) {
if !tc.expectedNonNilReader {
require.Nil(t, rd)
} else {
require.NotNil(t, rd)
require.NotNil(t, rdg)
rd, err := rdg(0)
require.NoError(t, err)
defer func() {
require.NoError(t, rd.Close())
}()

7
go.mod
View File

@ -90,6 +90,7 @@ require (
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-ipld-cbor v0.0.6
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-ipld-legacy v0.1.1 // indirect
github.com/ipfs/go-log/v2 v2.4.0
github.com/ipfs/go-merkledag v0.5.1
github.com/ipfs/go-metrics-interface v0.0.1
@ -156,14 +157,16 @@ require (
go.uber.org/fx v1.9.0
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.19.1
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac
golang.org/x/sys v0.0.0-20211209171907-798191bca915
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
golang.org/x/tools v0.1.5
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
gopkg.in/cheggaaa/pb.v1 v1.0.28
gotest.tools v2.2.0+incompatible
lukechampine.com/blake3 v1.1.7 // indirect
)
replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi

16
go.sum
View File

@ -774,8 +774,9 @@ github.com/ipfs/go-ipld-format v0.0.1/go.mod h1:kyJtbkDALmFHv3QR6et67i35QzO3S0dC
github.com/ipfs/go-ipld-format v0.0.2/go.mod h1:4B6+FM2u9OJ9zCV+kSbgFAZlOrv1Hqbf0INGQgiKf9k=
github.com/ipfs/go-ipld-format v0.2.0 h1:xGlJKkArkmBvowr+GMCX0FEZtkro71K1AwiKnL37mwA=
github.com/ipfs/go-ipld-format v0.2.0/go.mod h1:3l3C1uKoadTPbeNfrDi+xMInYKlx2Cvg1BuydPSdzQs=
github.com/ipfs/go-ipld-legacy v0.1.0 h1:wxkkc4k8cnvIGIjPO0waJCe7SHEyFgl+yQdafdjGrpA=
github.com/ipfs/go-ipld-legacy v0.1.0/go.mod h1:86f5P/srAmh9GcIcWQR9lfFLZPrIyyXQeVlOWeeWEuI=
github.com/ipfs/go-ipld-legacy v0.1.1 h1:BvD8PEuqwBHLTKqlGFTHSwrwFOMkVESEvwIYwR2cdcc=
github.com/ipfs/go-ipld-legacy v0.1.1/go.mod h1:8AyKFCjgRPsQFf15ZQgDB8Din4DML/fOmKZkkFkrIEg=
github.com/ipfs/go-ipns v0.1.2 h1:O/s/0ht+4Jl9+VoxoUo0zaHjnZUS+aBQIKTuzdZ/ucI=
github.com/ipfs/go-ipns v0.1.2/go.mod h1:ioQ0j02o6jdIVW+bmi18f4k2gRf0AV3kZ9KeHYHICnQ=
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
@ -1964,8 +1965,9 @@ golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210813211128-0a44fdfbc16e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210915214749-c084706c2272 h1:3erb+vDS8lU1sxfDHF4/hhWyaXnhIaO+7RgL4fDZORA=
golang.org/x/crypto v0.0.0-20210915214749-c084706c2272/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b h1:QAqMVf3pSa6eeTsuklijukjXBlj7Es2QQplab+/RbQ4=
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@ -2073,8 +2075,9 @@ golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf h1:R150MpwJIv1MpS0N/pc+NhTM8ajzvlmxlY5OYsrevXQ=
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@ -2196,8 +2199,8 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac h1:oN6lz7iLW/YC7un8pq+9bOLyXrprv2+DKfkJY+2LJJw=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211209171907-798191bca915 h1:P+8mCzuEpyszAT6T42q0sxU+eveBAF/cJ2Kp0x6/8+0=
golang.org/x/sys v0.0.0-20211209171907-798191bca915/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf h1:MZ2shdL+ZM/XzY3ZGOnh4Nlpnxz5GSOhOmtHo3iPU6M=
@ -2451,8 +2454,9 @@ honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
howett.net/plist v0.0.0-20181124034731-591f970eefbb h1:jhnBjNi9UFpfpl8YZhA9CrOqpnJdvzuiHsl/dnxl11M=
howett.net/plist v0.0.0-20181124034731-591f970eefbb/go.mod h1:vMygbs4qMhSZSc4lCUl2OEE+rDiIIJAIdR4m7MiMcm0=
lukechampine.com/blake3 v1.1.6 h1:H3cROdztr7RCfoaTpGZFQsrqvweFLrqS73j7L7cmR5c=
lukechampine.com/blake3 v1.1.6/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA=
lukechampine.com/blake3 v1.1.7 h1:GgRMhmdsuK8+ii6UZFDL8Nb+VyMwadAgcJyfYHxG6n0=
lukechampine.com/blake3 v1.1.7/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA=
modernc.org/cc v1.0.0 h1:nPibNuDEx6tvYrUAtvDTTw98rx5juGsa5zuDnKwEEQQ=
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
modernc.org/fileutil v1.0.0/go.mod h1:JHsWpkrk/CnVV1H/eGlFf85BEpfkrp56ro8nojIq9Q8=

View File

@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
"go.uber.org/zap"
logging "github.com/ipfs/go-log/v2"
)
@ -50,7 +51,7 @@ func jaegerOptsFromEnv() jaeger.EndpointOption {
}
if e, ok = os.LookupEnv(envAgentHost); ok {
options := []jaeger.AgentEndpointOption{jaeger.WithAgentHost(e)}
options := []jaeger.AgentEndpointOption{jaeger.WithAgentHost(e), jaeger.WithLogger(zap.NewStdLog(log.Desugar()))}
var ep string
if p, ok := os.LookupEnv(envAgentPort); ok {
options = append(options, jaeger.WithAgentPort(p))
@ -82,6 +83,7 @@ func SetupJaegerTracing(serviceName string) *tracesdk.TracerProvider {
semconv.SchemaURL,
semconv.ServiceNameKey.String(serviceName),
)),
tracesdk.WithSampler(tracesdk.AlwaysSample()),
)
otel.SetTracerProvider(tp)
tracer := tp.Tracer(serviceName)

View File

@ -3,22 +3,22 @@ package dagstore
import (
"context"
"fmt"
"io"
"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-state-types/abi"
)
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_lotus_accessor.go -package=mock_dagstore . MinerAPI
type MinerAPI interface {
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error)
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error)
GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error)
IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error)
Start(ctx context.Context) error
@ -27,7 +27,7 @@ type MinerAPI interface {
type SectorAccessor interface {
retrievalmarket.SectorAccessor
UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error)
UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error)
}
type minerAPI struct {
@ -100,10 +100,10 @@ func (m *minerAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, erro
return false, nil
}
func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) {
func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error) {
err := m.readyMgr.AwaitReady()
if err != nil {
return nil, 0, err
return nil, err
}
// Throttle this path to avoid flooding the storage subsystem.
@ -114,11 +114,11 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
})
if err != nil {
return nil, 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
}
if len(pieceInfo.Deals) == 0 {
return nil, 0, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
return nil, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
}
// prefer an unsealed sector containing the piece if one exists
@ -126,7 +126,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
deal := deal
// Throttle this path to avoid flooding the storage subsystem.
var reader io.ReadCloser
var reader mount.Reader
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
isUnsealed, err := m.sa.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err != nil {
@ -136,7 +136,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
return nil
}
// Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing.
reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded())
reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
return err
})
@ -147,7 +147,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
if reader != nil {
// we were able to obtain a reader for an already unsealed piece
return reader, deal.Length.Unpadded(), nil
return reader, nil
}
}
@ -158,7 +158,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
// block for a long time with the current PoRep
//
// This path is unthrottled.
reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded())
reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err != nil {
lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err)
log.Warn(lastErr.Error())
@ -166,10 +166,10 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off
}
// Successfully fetched the deal data so return a reader over the data
return reader, deal.Length.Unpadded(), nil
return reader, nil
}
return nil, 0, lastErr
return nil, lastErr
}
func (m *minerAPI) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {

View File

@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
@ -87,7 +88,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
}
// Fetch the piece
r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0)
r, err := api.FetchUnsealedPiece(ctx, cid1)
if tc.expectErr {
require.Error(t, err)
return
@ -159,7 +160,7 @@ func TestThrottle(t *testing.T) {
errgrp, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 10; i++ {
errgrp.Go(func() error {
r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0)
r, err := api.FetchUnsealedPiece(ctx, cid1)
if err == nil {
_ = r.Close()
}
@ -203,10 +204,10 @@ type mockRPN struct {
}
func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
return m.UnsealSectorAt(ctx, sectorID, offset, 0, length)
return m.UnsealSectorAt(ctx, sectorID, offset, length)
}
func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) {
atomic.AddInt32(&m.calls, 1)
m.lk.RLock()
defer m.lk.RUnlock()
@ -215,7 +216,13 @@ func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber,
if !ok {
panic("sector not found")
}
return io.NopCloser(bytes.NewBuffer([]byte(data[startOffset:]))), nil
return struct {
io.ReadCloser
io.ReaderAt
io.Seeker
}{
ReadCloser: io.NopCloser(bytes.NewBuffer([]byte(data[:]))),
}, nil
}
func (m *mockRPN) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {

View File

@ -6,10 +6,9 @@ package mock_dagstore
import (
context "context"
io "io"
reflect "reflect"
abi "github.com/filecoin-project/go-state-types/abi"
mount "github.com/filecoin-project/dagstore/mount"
gomock "github.com/golang/mock/gomock"
cid "github.com/ipfs/go-cid"
)
@ -38,19 +37,18 @@ func (m *MockMinerAPI) EXPECT() *MockMinerAPIMockRecorder {
}
// FetchUnsealedPiece mocks base method.
func (m *MockMinerAPI) FetchUnsealedPiece(arg0 context.Context, arg1 cid.Cid, arg2 uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) {
func (m *MockMinerAPI) FetchUnsealedPiece(arg0 context.Context, arg1 cid.Cid) (mount.Reader, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchUnsealedPiece", arg0, arg1, arg2)
ret0, _ := ret[0].(io.ReadCloser)
ret1, _ := ret[1].(abi.UnpaddedPieceSize)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
ret := m.ctrl.Call(m, "FetchUnsealedPiece", arg0, arg1)
ret0, _ := ret[0].(mount.Reader)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchUnsealedPiece indicates an expected call of FetchUnsealedPiece.
func (mr *MockMinerAPIMockRecorder) FetchUnsealedPiece(arg0, arg1, arg2 interface{}) *gomock.Call {
func (mr *MockMinerAPIMockRecorder) FetchUnsealedPiece(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockMinerAPI)(nil).FetchUnsealedPiece), arg0, arg1, arg2)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockMinerAPI)(nil).FetchUnsealedPiece), arg0, arg1)
}
// GetUnpaddedCARSize mocks base method.

View File

@ -56,11 +56,7 @@ func (l *LotusMount) Deserialize(u *url.URL) error {
}
func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) {
return (&pieceReader{
ctx: ctx,
api: l.API,
pieceCid: l.PieceCid,
}).init()
return l.API.FetchUnsealedPiece(ctx, l.PieceCid)
}
func (l *LotusMount) Info() mount.Info {

View File

@ -2,6 +2,7 @@ package dagstore
import (
"context"
"io"
"io/ioutil"
"net/url"
"strings"
@ -12,8 +13,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-state-types/abi"
mock_dagstore "github.com/filecoin-project/lotus/markets/dagstore/mocks"
)
@ -30,8 +29,28 @@ func TestLotusMount(t *testing.T) {
mockLotusMountAPI := mock_dagstore.NewMockMinerAPI(mockCtrl)
mockLotusMountAPI.EXPECT().IsUnsealed(gomock.Any(), cid).Return(true, nil).Times(1)
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1)
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1)
mr1 := struct {
io.ReadCloser
io.ReaderAt
io.Seeker
}{
ReadCloser: ioutil.NopCloser(strings.NewReader("testing")),
ReaderAt: nil,
Seeker: nil,
}
mr2 := struct {
io.ReadCloser
io.ReaderAt
io.Seeker
}{
ReadCloser: ioutil.NopCloser(strings.NewReader("testing")),
ReaderAt: nil,
Seeker: nil,
}
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(mr1, nil).Times(1)
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(mr2, nil).Times(1)
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1)
mnt, err := NewLotusMount(cid, mockLotusMountAPI)

View File

@ -6,9 +6,9 @@ import (
"testing"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
@ -127,17 +127,20 @@ type wrappedSA struct {
retrievalmarket.SectorAccessor
}
func (w *wrappedSA) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
func (w *wrappedSA) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) {
r, err := w.UnsealSector(ctx, sectorID, pieceOffset, length)
if err != nil {
return nil, err
}
if startOffset > 0 {
if _, err := io.CopyN(io.Discard, r, int64(startOffset)); err != nil {
return nil, xerrors.Errorf("discard start off: %w", err)
}
}
return r, err
return struct {
io.ReadCloser
io.Seeker
io.ReaderAt
}{
ReadCloser: r,
Seeker: nil,
ReaderAt: nil,
}, err
}
var _ SectorAccessor = &wrappedSA{}

View File

@ -3,7 +3,6 @@ package dagstore
import (
"bytes"
"context"
"io"
"os"
"testing"
"time"
@ -12,8 +11,6 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
@ -192,7 +189,7 @@ func (m mockLotusMount) Start(ctx context.Context) error {
return nil
}
func (m mockLotusMount) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) {
func (m mockLotusMount) FetchUnsealedPiece(context.Context, cid.Cid) (mount.Reader, error) {
panic("implement me")
}

View File

@ -4,8 +4,16 @@ import (
"context"
"io"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-state-types/abi"
specstorage "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/chain/types"
@ -14,14 +22,6 @@ import (
"github.com/filecoin-project/lotus/markets/dagstore"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-state-types/abi"
specstorage "github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
)
var log = logging.Logger("sectoraccessor")
@ -40,10 +40,10 @@ func NewSectorAccessor(maddr dtypes.MinerAddress, secb sectorblocks.SectorBuilde
}
func (sa *sectorAccessor) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
return sa.UnsealSectorAt(ctx, sectorID, pieceOffset, 0, length)
return sa.UnsealSectorAt(ctx, sectorID, pieceOffset, length)
}
func (sa *sectorAccessor) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
func (sa *sectorAccessor) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) {
log.Debugf("get sector %d, pieceOffset %d, length %d", sectorID, pieceOffset, length)
si, err := sa.sectorsStatus(ctx, sectorID, false)
if err != nil {
@ -69,8 +69,8 @@ func (sa *sectorAccessor) UnsealSectorAt(ctx context.Context, sectorID abi.Secto
}
// Get a reader for the piece, unsealing the piece if necessary
log.Debugf("read piece in sector %d, pieceOffset %d, startOffset %d, length %d from miner %d", sectorID, pieceOffset, startOffset, length, mid)
r, unsealed, err := sa.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(pieceOffset), startOffset, length, si.Ticket.Value, commD)
log.Debugf("read piece in sector %d, pieceOffset %d, length %d from miner %d", sectorID, pieceOffset, length, mid)
r, unsealed, err := sa.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(pieceOffset), length, si.Ticket.Value, commD)
if err != nil {
return nil, xerrors.Errorf("failed to unseal piece from sector %d: %w", sectorID, err)
}

View File

@ -995,8 +995,9 @@ github.com/ipfs/go-ipld-format v0.0.1/go.mod h1:kyJtbkDALmFHv3QR6et67i35QzO3S0dC
github.com/ipfs/go-ipld-format v0.0.2/go.mod h1:4B6+FM2u9OJ9zCV+kSbgFAZlOrv1Hqbf0INGQgiKf9k=
github.com/ipfs/go-ipld-format v0.2.0 h1:xGlJKkArkmBvowr+GMCX0FEZtkro71K1AwiKnL37mwA=
github.com/ipfs/go-ipld-format v0.2.0/go.mod h1:3l3C1uKoadTPbeNfrDi+xMInYKlx2Cvg1BuydPSdzQs=
github.com/ipfs/go-ipld-legacy v0.1.0 h1:wxkkc4k8cnvIGIjPO0waJCe7SHEyFgl+yQdafdjGrpA=
github.com/ipfs/go-ipld-legacy v0.1.0/go.mod h1:86f5P/srAmh9GcIcWQR9lfFLZPrIyyXQeVlOWeeWEuI=
github.com/ipfs/go-ipld-legacy v0.1.1 h1:BvD8PEuqwBHLTKqlGFTHSwrwFOMkVESEvwIYwR2cdcc=
github.com/ipfs/go-ipld-legacy v0.1.1/go.mod h1:8AyKFCjgRPsQFf15ZQgDB8Din4DML/fOmKZkkFkrIEg=
github.com/ipfs/go-ipns v0.1.2 h1:O/s/0ht+4Jl9+VoxoUo0zaHjnZUS+aBQIKTuzdZ/ucI=
github.com/ipfs/go-ipns v0.1.2/go.mod h1:ioQ0j02o6jdIVW+bmi18f4k2gRf0AV3kZ9KeHYHICnQ=
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
@ -2307,8 +2308,9 @@ golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210813211128-0a44fdfbc16e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210915214749-c084706c2272 h1:3erb+vDS8lU1sxfDHF4/hhWyaXnhIaO+7RgL4fDZORA=
golang.org/x/crypto v0.0.0-20210915214749-c084706c2272/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b h1:QAqMVf3pSa6eeTsuklijukjXBlj7Es2QQplab+/RbQ4=
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@ -2425,8 +2427,9 @@ golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf h1:R150MpwJIv1MpS0N/pc+NhTM8ajzvlmxlY5OYsrevXQ=
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@ -2566,8 +2569,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac h1:oN6lz7iLW/YC7un8pq+9bOLyXrprv2+DKfkJY+2LJJw=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211209171907-798191bca915 h1:P+8mCzuEpyszAT6T42q0sxU+eveBAF/cJ2Kp0x6/8+0=
golang.org/x/sys v0.0.0-20211209171907-798191bca915/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf h1:MZ2shdL+ZM/XzY3ZGOnh4Nlpnxz5GSOhOmtHo3iPU6M=
@ -2859,8 +2862,9 @@ k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/kube-openapi v0.0.0-20200316234421-82d701f24f9d/go.mod h1:F+5wygcW0wmRTnM3cOgIqGivxkwSWIWT5YdsDbeAOaU=
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
k8s.io/utils v0.0.0-20200414100711-2df71ebbae66/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
lukechampine.com/blake3 v1.1.6 h1:H3cROdztr7RCfoaTpGZFQsrqvweFLrqS73j7L7cmR5c=
lukechampine.com/blake3 v1.1.6/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA=
lukechampine.com/blake3 v1.1.7 h1:GgRMhmdsuK8+ii6UZFDL8Nb+VyMwadAgcJyfYHxG6n0=
lukechampine.com/blake3 v1.1.7/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA=
modernc.org/cc v1.0.0 h1:nPibNuDEx6tvYrUAtvDTTw98rx5juGsa5zuDnKwEEQQ=
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
modernc.org/fileutil v1.0.0/go.mod h1:JHsWpkrk/CnVV1H/eGlFf85BEpfkrp56ro8nojIq9Q8=