diff --git a/chain/vm/runtime.go b/chain/vm/runtime.go index 6e94030bd..72f4daab8 100644 --- a/chain/vm/runtime.go +++ b/chain/vm/runtime.go @@ -5,6 +5,7 @@ import ( "context" "encoding/binary" "fmt" + "os" gruntime "runtime" "time" @@ -52,7 +53,7 @@ func (m *Message) ValueReceived() abi.TokenAmount { } // EnableGasTracing, if true, outputs gas tracing in execution traces. -var EnableGasTracing = false +var EnableGasTracing = os.Getenv("LOTUS_VM_ENABLE_GAS_TRACING_VERY_SLOW") == "1" type Runtime struct { rt5.Message diff --git a/cmd/lotus-miner/retrieval-deals.go b/cmd/lotus-miner/retrieval-deals.go index 1ce1f6593..bd5d30a4e 100644 --- a/cmd/lotus-miner/retrieval-deals.go +++ b/cmd/lotus-miner/retrieval-deals.go @@ -3,6 +3,7 @@ package main import ( "fmt" "os" + "sort" "text/tabwriter" "github.com/docker/go-units" @@ -137,6 +138,10 @@ var retrievalDealsListCmd = &cli.Command{ return err } + sort.Slice(deals, func(i, j int) bool { + return deals[i].ID < deals[j].ID + }) + w := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0) _, _ = fmt.Fprintf(w, "Receiver\tDealID\tPayload\tState\tPricePerByte\tBytesSent\tMessage\n") diff --git a/extern/sector-storage/fr32/fr32.go b/extern/sector-storage/fr32/fr32.go index 17e6a1142..24175719c 100644 --- a/extern/sector-storage/fr32/fr32.go +++ b/extern/sector-storage/fr32/fr32.go @@ -8,7 +8,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" ) -var MTTresh = uint64(32 << 20) +var MTTresh = uint64(512 << 10) func mtChunkCount(usz abi.PaddedPieceSize) uint64 { threads := (uint64(usz)) / MTTresh diff --git a/extern/sector-storage/fr32/readers.go b/extern/sector-storage/fr32/readers.go index f14d5bf1c..163c520aa 100644 --- a/extern/sector-storage/fr32/readers.go +++ b/extern/sector-storage/fr32/readers.go @@ -16,13 +16,21 @@ type unpadReader struct { work []byte } +func BufSize(sz abi.PaddedPieceSize) int { + return int(MTTresh * mtChunkCount(sz)) +} + func NewUnpadReader(src io.Reader, sz abi.PaddedPieceSize) (io.Reader, error) { + buf := make([]byte, BufSize(sz)) + + return NewUnpadReaderBuf(src, sz, buf) +} + +func NewUnpadReaderBuf(src io.Reader, sz abi.PaddedPieceSize, buf []byte) (io.Reader, error) { if err := sz.Validate(); err != nil { return nil, xerrors.Errorf("bad piece size: %w", err) } - buf := make([]byte, MTTresh*mtChunkCount(sz)) - return &unpadReader{ src: src, diff --git a/extern/sector-storage/mock/mock.go b/extern/sector-storage/mock/mock.go index eeb1404ad..8eaed54f6 100644 --- a/extern/sector-storage/mock/mock.go +++ b/extern/sector-storage/mock/mock.go @@ -12,6 +12,7 @@ import ( proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof" + "github.com/filecoin-project/dagstore/mount" ffiwrapper2 "github.com/filecoin-project/go-commp-utils/ffiwrapper" commcid "github.com/filecoin-project/go-fil-commcid" "github.com/filecoin-project/go-state-types/abi" @@ -384,12 +385,22 @@ func generateFakePoSt(sectorInfo []proof5.SectorInfo, rpt func(abi.RegisteredSea } } -func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) { +func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) { if uint64(offset) != 0 { panic("implme") } - return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][startOffset:size])), false, nil + br := bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size]) + + return struct { + io.ReadCloser + io.Seeker + io.ReaderAt + }{ + ReadCloser: ioutil.NopCloser(br), + Seeker: br, + ReaderAt: br, + }, false, nil } func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) { diff --git a/extern/sector-storage/piece_provider.go b/extern/sector-storage/piece_provider.go index aa2ef0d0d..4622289e8 100644 --- a/extern/sector-storage/piece_provider.go +++ b/extern/sector-storage/piece_provider.go @@ -8,6 +8,7 @@ import ( "github.com/ipfs/go-cid" "golang.org/x/xerrors" + "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/specs-storage/storage" @@ -27,7 +28,7 @@ type PieceProvider interface { // default in most cases, but this might matter with future PoRep) // startOffset is added to the pieceOffset to get the starting reader offset. // The number of bytes that can be read is pieceSize-startOffset - ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) + ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) } @@ -71,37 +72,91 @@ func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storage.SectorRef // It will NOT try to schedule an Unseal of a sealed sector file for the read. // // Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers. -func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, pieceOffset, startOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) { +func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (mount.Reader, error) { // acquire a lock purely for reading unsealed sectors ctx, cancel := context.WithCancel(ctx) if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil { cancel() - return nil, nil, xerrors.Errorf("acquiring read sector lock: %w", err) + return nil, xerrors.Errorf("acquiring read sector lock: %w", err) } - // Reader returns a reader for an unsealed piece at the given offset in the given sector. + // Reader returns a reader getter for an unsealed piece at the given offset in the given sector. // The returned reader will be nil if none of the workers has an unsealed sector file containing // the unsealed piece. - r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()+startOffset.Padded()), size.Padded()-abi.PaddedPieceSize(startOffset.Padded())) + rg, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()), size.Padded()) if err != nil { + cancel() log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err) - cancel() - return nil, nil, err + return nil, err } - if r == nil { + if rg == nil { cancel() + return nil, nil } - return r, cancel, nil + buf := make([]byte, fr32.BufSize(size.Padded())) + + pr, err := (&pieceReader{ + ctx: ctx, + getReader: func(ctx context.Context, startOffset uint64) (io.ReadCloser, error) { + startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127 + + r, err := rg(startOffsetAligned.Padded()) + if err != nil { + return nil, xerrors.Errorf("getting reader at +%d: %w", startOffsetAligned, err) + } + + upr, err := fr32.NewUnpadReaderBuf(r, size.Padded(), buf) + if err != nil { + r.Close() // nolint + return nil, xerrors.Errorf("creating unpadded reader: %w", err) + } + + bir := bufio.NewReaderSize(upr, 127) + if startOffset > uint64(startOffsetAligned) { + if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil { + r.Close() // nolint + return nil, xerrors.Errorf("discarding bytes for startOffset: %w", err) + } + } + + return struct { + io.Reader + io.Closer + }{ + Reader: bir, + Closer: funcCloser(func() error { + return r.Close() + }), + }, nil + }, + len: size, + onClose: cancel, + pieceCid: pc, + }).init() + if err != nil || pr == nil { // pr == nil to make sure we don't return typed nil + cancel() + return nil, err + } + + return pr, err } +type funcCloser func() error + +func (f funcCloser) Close() error { + return f() +} + +var _ io.Closer = funcCloser(nil) + // ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector // If an Unsealed sector file exists with the Piece Unsealed in it, we'll use that for the read. // Otherwise, we will Unseal a Sealed sector file for the given sector and read the Unsealed piece from it. // If we do NOT have an existing unsealed file containing the given piece thus causing us to schedule an Unseal, // the returned boolean parameter will be set to true. // If we have an existing unsealed file containing the given piece, the returned boolean will be set to false. -func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) { +func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) { if err := pieceOffset.Valid(); err != nil { return nil, false, xerrors.Errorf("pieceOffset is not valid: %w", err) } @@ -109,9 +164,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, return nil, false, xerrors.Errorf("size is not a valid piece size: %w", err) } - startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127 - - r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size) + r, err := p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size) log.Debugf("result of first tryReadUnsealedPiece: r=%+v, err=%s", r, err) @@ -142,7 +195,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, log.Debugf("unsealed a sector file to read the piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size) - r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size) + r, err = p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size) if err != nil { log.Errorf("failed to tryReadUnsealedPiece after SectorsUnsealPiece: %s", err) return nil, true, xerrors.Errorf("read after unsealing: %w", err) @@ -156,34 +209,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size) } - upr, err := fr32.NewUnpadReader(r, size.Padded()) - if err != nil { - unlock() - return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err) - } + log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size) - log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, startOffset=%d, size=%d", sector, pieceOffset, startOffset, size) - - bir := bufio.NewReaderSize(upr, 127) - if startOffset > uint64(startOffsetAligned) { - if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil { - return nil, false, xerrors.Errorf("discarding bytes for startOffset: %w", err) - } - } - - return &funcCloser{ - Reader: bir, - close: func() error { - err = r.Close() - unlock() - return err - }, - }, uns, nil + return r, uns, nil } - -type funcCloser struct { - io.Reader - close func() error -} - -func (fc *funcCloser) Close() error { return fc.close() } diff --git a/extern/sector-storage/piece_provider_test.go b/extern/sector-storage/piece_provider_test.go index 1aad3d2d2..3ace2916e 100644 --- a/extern/sector-storage/piece_provider_test.go +++ b/extern/sector-storage/piece_provider_test.go @@ -338,7 +338,7 @@ func (p *pieceProviderTestHarness) isUnsealed(t *testing.T, offset storiface.Unp func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, expectedHadToUnseal bool, expectedBytes []byte) { - rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, 0, size, p.ticket, p.commD) + rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD) require.NoError(t, err) require.NotNil(t, rd) require.Equal(t, expectedHadToUnseal, isUnsealed) diff --git a/markets/dagstore/piecereader.go b/extern/sector-storage/piece_reader.go similarity index 88% rename from markets/dagstore/piecereader.go rename to extern/sector-storage/piece_reader.go index 67cd10c27..d7a3f4e98 100644 --- a/markets/dagstore/piecereader.go +++ b/extern/sector-storage/piece_reader.go @@ -1,4 +1,4 @@ -package dagstore +package sectorstorage import ( "bufio" @@ -19,11 +19,14 @@ import ( var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M var ReadBuf = 128 * (127 * 8) // unpadded(128k) +type pieceGetter func(ctx context.Context, offset uint64) (io.ReadCloser, error) + type pieceReader struct { - ctx context.Context - api MinerAPI - pieceCid cid.Cid - len abi.UnpaddedPieceSize + ctx context.Context + getReader pieceGetter + pieceCid cid.Cid + len abi.UnpaddedPieceSize + onClose context.CancelFunc closed bool seqAt int64 // next byte to be read by io.Reader @@ -37,10 +40,14 @@ func (p *pieceReader) init() (_ *pieceReader, err error) { stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1)) p.rAt = 0 - p.r, p.len, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt)) + p.r, err = p.getReader(p.ctx, uint64(p.rAt)) if err != nil { return nil, err } + if p.r == nil { + return nil, nil + } + p.br = bufio.NewReaderSize(p.r, ReadBuf) return p, nil @@ -60,12 +67,19 @@ func (p *pieceReader) Close() error { } if p.r != nil { + if err := p.r.Close(); err != nil { + return err + } if err := p.r.Close(); err != nil { return err } p.r = nil } + p.onClose() + + p.closed = true + return nil } @@ -127,7 +141,7 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { } p.rAt = off - p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt)) + p.r, err = p.getReader(p.ctx, uint64(p.rAt)) p.br = bufio.NewReaderSize(p.r, ReadBuf) if err != nil { return 0, xerrors.Errorf("getting backing reader: %w", err) diff --git a/extern/sector-storage/stores/http_handler.go b/extern/sector-storage/stores/http_handler.go index 5996392d8..771a9a3a1 100644 --- a/extern/sector-storage/stores/http_handler.go +++ b/extern/sector-storage/stores/http_handler.go @@ -84,7 +84,6 @@ func (handler *FetchHandler) remoteStatFs(w http.ResponseWriter, r *http.Request // remoteGetSector returns the sector file/tared directory byte stream for the sectorID and sector file type sent in the request. // returns an error if it does NOT have the required sector file/dir. func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Request) { - log.Infof("SERVE GET %s", r.URL) vars := mux.Vars(r) id, err := storiface.ParseSectorID(vars["id"]) diff --git a/extern/sector-storage/stores/remote.go b/extern/sector-storage/stores/remote.go index 7935556a9..bd6b34be3 100644 --- a/extern/sector-storage/stores/remote.go +++ b/extern/sector-storage/stores/remote.go @@ -585,7 +585,7 @@ func (r *Remote) CheckIsUnsealed(ctx context.Context, s storage.SectorRef, offse // 1. no worker(local worker included) has an unsealed file for the given sector OR // 2. no worker(local worker included) has the unsealed piece in their unsealed sector file. // Will return a nil reader and a nil error in such a case. -func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) { +func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size abi.PaddedPieceSize) (func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error), error) { ft := storiface.FTUnsealed // check if we have the unsealed sector file locally @@ -623,7 +623,52 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a if has { log.Infof("returning piece reader for local unsealed piece sector=%+v, (offset=%d, size=%d)", s.ID, offset, size) - return r.pfHandler.Reader(pf, storiface.PaddedByteIndex(offset), size) + + return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) { + // don't reuse between readers unless closed + f := pf + pf = nil + + if f == nil { + f, err = r.pfHandler.OpenPartialFile(abi.PaddedPieceSize(ssize), path) + if err != nil { + return nil, xerrors.Errorf("opening partial file: %w", err) + } + log.Debugf("local partial file (re)opened %s (+%d,%d)", path, offset, size) + } + + r, err := r.pfHandler.Reader(f, storiface.PaddedByteIndex(offset)+startOffsetAligned, size-abi.PaddedPieceSize(startOffsetAligned)) + if err != nil { + return nil, err + } + + return struct { + io.Reader + io.Closer + }{ + Reader: r, + Closer: funcCloser(func() error { + // if we already have a reader cached, close this one + if pf != nil { + if f == nil { + return nil + } + if pf == f { + pf = nil + } + + tmp := f + f = nil + return tmp.Close() + } + + // otherwise stash it away for reuse + pf = f + return nil + }), + }, nil + }, nil + } log.Debugf("miner has unsealed file but not unseal piece, %s (+%d,%d)", path, offset, size) @@ -666,16 +711,18 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a continue } - // readRemote fetches a reader that we can use to read the unsealed piece from the remote worker. - // It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file. - rd, err := r.readRemote(ctx, url, offset, size) - if err != nil { - log.Warnw("reading from remote", "url", url, "error", err) - lastErr = err - continue - } - log.Infof("Read remote %s (+%d,%d)", url, offset, size) - return rd, nil + return func(startOffsetAligned storiface.PaddedByteIndex) (io.ReadCloser, error) { + // readRemote fetches a reader that we can use to read the unsealed piece from the remote worker. + // It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file. + rd, err := r.readRemote(ctx, url, offset+abi.PaddedPieceSize(startOffsetAligned), size) + if err != nil { + log.Warnw("reading from remote", "url", url, "error", err) + return nil, err + } + + return rd, err + }, nil + } } @@ -692,3 +739,11 @@ func (r *Remote) Reserve(ctx context.Context, sid storage.SectorRef, ft storifac } var _ Store = &Remote{} + +type funcCloser func() error + +func (f funcCloser) Close() error { + return f() +} + +var _ io.Closer = funcCloser(nil) diff --git a/extern/sector-storage/stores/remote_test.go b/extern/sector-storage/stores/remote_test.go index ea9179655..0bc439dee 100644 --- a/extern/sector-storage/stores/remote_test.go +++ b/extern/sector-storage/stores/remote_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "net/http/httptest" @@ -470,12 +471,20 @@ func TestReader(t *testing.T) { remoteStore := stores.NewRemote(lstore, index, nil, 6000, pfhandler) - rd, err := remoteStore.Reader(ctx, sectorRef, offset, size) + rdg, err := remoteStore.Reader(ctx, sectorRef, offset, size) + var rd io.ReadCloser if tc.errStr != "" { - require.Error(t, err) - require.Nil(t, rd) - require.Contains(t, err.Error(), tc.errStr) + if rdg == nil { + require.Error(t, err) + require.Nil(t, rdg) + require.Contains(t, err.Error(), tc.errStr) + } else { + rd, err = rdg(0) + require.Error(t, err) + require.Nil(t, rd) + require.Contains(t, err.Error(), tc.errStr) + } } else { require.NoError(t, err) } @@ -483,7 +492,10 @@ func TestReader(t *testing.T) { if !tc.expectedNonNilReader { require.Nil(t, rd) } else { - require.NotNil(t, rd) + require.NotNil(t, rdg) + rd, err := rdg(0) + require.NoError(t, err) + defer func() { require.NoError(t, rd.Close()) }() diff --git a/go.mod b/go.mod index d0685be51..45cefb7a2 100644 --- a/go.mod +++ b/go.mod @@ -90,6 +90,7 @@ require ( github.com/ipfs/go-ipfs-util v0.0.2 github.com/ipfs/go-ipld-cbor v0.0.6 github.com/ipfs/go-ipld-format v0.2.0 + github.com/ipfs/go-ipld-legacy v0.1.1 // indirect github.com/ipfs/go-log/v2 v2.4.0 github.com/ipfs/go-merkledag v0.5.1 github.com/ipfs/go-metrics-interface v0.0.1 @@ -156,14 +157,16 @@ require ( go.uber.org/fx v1.9.0 go.uber.org/multierr v1.7.0 go.uber.org/zap v1.19.1 - golang.org/x/net v0.0.0-20210917221730-978cfadd31cf + golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b // indirect + golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c - golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac + golang.org/x/sys v0.0.0-20211209171907-798191bca915 golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac golang.org/x/tools v0.1.5 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 gopkg.in/cheggaaa/pb.v1 v1.0.28 gotest.tools v2.2.0+incompatible + lukechampine.com/blake3 v1.1.7 // indirect ) replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi diff --git a/go.sum b/go.sum index 5ecad5936..e18478aab 100644 --- a/go.sum +++ b/go.sum @@ -774,8 +774,9 @@ github.com/ipfs/go-ipld-format v0.0.1/go.mod h1:kyJtbkDALmFHv3QR6et67i35QzO3S0dC github.com/ipfs/go-ipld-format v0.0.2/go.mod h1:4B6+FM2u9OJ9zCV+kSbgFAZlOrv1Hqbf0INGQgiKf9k= github.com/ipfs/go-ipld-format v0.2.0 h1:xGlJKkArkmBvowr+GMCX0FEZtkro71K1AwiKnL37mwA= github.com/ipfs/go-ipld-format v0.2.0/go.mod h1:3l3C1uKoadTPbeNfrDi+xMInYKlx2Cvg1BuydPSdzQs= -github.com/ipfs/go-ipld-legacy v0.1.0 h1:wxkkc4k8cnvIGIjPO0waJCe7SHEyFgl+yQdafdjGrpA= github.com/ipfs/go-ipld-legacy v0.1.0/go.mod h1:86f5P/srAmh9GcIcWQR9lfFLZPrIyyXQeVlOWeeWEuI= +github.com/ipfs/go-ipld-legacy v0.1.1 h1:BvD8PEuqwBHLTKqlGFTHSwrwFOMkVESEvwIYwR2cdcc= +github.com/ipfs/go-ipld-legacy v0.1.1/go.mod h1:8AyKFCjgRPsQFf15ZQgDB8Din4DML/fOmKZkkFkrIEg= github.com/ipfs/go-ipns v0.1.2 h1:O/s/0ht+4Jl9+VoxoUo0zaHjnZUS+aBQIKTuzdZ/ucI= github.com/ipfs/go-ipns v0.1.2/go.mod h1:ioQ0j02o6jdIVW+bmi18f4k2gRf0AV3kZ9KeHYHICnQ= github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM= @@ -1964,8 +1965,9 @@ golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5 golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210813211128-0a44fdfbc16e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20210915214749-c084706c2272 h1:3erb+vDS8lU1sxfDHF4/hhWyaXnhIaO+7RgL4fDZORA= golang.org/x/crypto v0.0.0-20210915214749-c084706c2272/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b h1:QAqMVf3pSa6eeTsuklijukjXBlj7Es2QQplab+/RbQ4= +golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -2073,8 +2075,9 @@ golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20210917221730-978cfadd31cf h1:R150MpwJIv1MpS0N/pc+NhTM8ajzvlmxlY5OYsrevXQ= golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -2196,8 +2199,8 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac h1:oN6lz7iLW/YC7un8pq+9bOLyXrprv2+DKfkJY+2LJJw= -golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211209171907-798191bca915 h1:P+8mCzuEpyszAT6T42q0sxU+eveBAF/cJ2Kp0x6/8+0= +golang.org/x/sys v0.0.0-20211209171907-798191bca915/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf h1:MZ2shdL+ZM/XzY3ZGOnh4Nlpnxz5GSOhOmtHo3iPU6M= @@ -2451,8 +2454,9 @@ honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= howett.net/plist v0.0.0-20181124034731-591f970eefbb h1:jhnBjNi9UFpfpl8YZhA9CrOqpnJdvzuiHsl/dnxl11M= howett.net/plist v0.0.0-20181124034731-591f970eefbb/go.mod h1:vMygbs4qMhSZSc4lCUl2OEE+rDiIIJAIdR4m7MiMcm0= -lukechampine.com/blake3 v1.1.6 h1:H3cROdztr7RCfoaTpGZFQsrqvweFLrqS73j7L7cmR5c= lukechampine.com/blake3 v1.1.6/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA= +lukechampine.com/blake3 v1.1.7 h1:GgRMhmdsuK8+ii6UZFDL8Nb+VyMwadAgcJyfYHxG6n0= +lukechampine.com/blake3 v1.1.7/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA= modernc.org/cc v1.0.0 h1:nPibNuDEx6tvYrUAtvDTTw98rx5juGsa5zuDnKwEEQQ= modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw= modernc.org/fileutil v1.0.0/go.mod h1:JHsWpkrk/CnVV1H/eGlFf85BEpfkrp56ro8nojIq9Q8= diff --git a/lib/tracing/setup.go b/lib/tracing/setup.go index 1a2d8128e..d90099f79 100644 --- a/lib/tracing/setup.go +++ b/lib/tracing/setup.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/otel/sdk/resource" tracesdk "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.7.0" + "go.uber.org/zap" logging "github.com/ipfs/go-log/v2" ) @@ -50,7 +51,7 @@ func jaegerOptsFromEnv() jaeger.EndpointOption { } if e, ok = os.LookupEnv(envAgentHost); ok { - options := []jaeger.AgentEndpointOption{jaeger.WithAgentHost(e)} + options := []jaeger.AgentEndpointOption{jaeger.WithAgentHost(e), jaeger.WithLogger(zap.NewStdLog(log.Desugar()))} var ep string if p, ok := os.LookupEnv(envAgentPort); ok { options = append(options, jaeger.WithAgentPort(p)) @@ -82,6 +83,7 @@ func SetupJaegerTracing(serviceName string) *tracesdk.TracerProvider { semconv.SchemaURL, semconv.ServiceNameKey.String(serviceName), )), + tracesdk.WithSampler(tracesdk.AlwaysSample()), ) otel.SetTracerProvider(tp) tracer := tp.Tracer(serviceName) diff --git a/markets/dagstore/miner_api.go b/markets/dagstore/miner_api.go index d59a05846..77b4b97bf 100644 --- a/markets/dagstore/miner_api.go +++ b/markets/dagstore/miner_api.go @@ -3,22 +3,22 @@ package dagstore import ( "context" "fmt" - "io" - "github.com/filecoin-project/dagstore/throttle" - "github.com/filecoin-project/go-state-types/abi" "github.com/ipfs/go-cid" "golang.org/x/xerrors" + "github.com/filecoin-project/dagstore/mount" + "github.com/filecoin-project/dagstore/throttle" "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/shared" + "github.com/filecoin-project/go-state-types/abi" ) //go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_lotus_accessor.go -package=mock_dagstore . MinerAPI type MinerAPI interface { - FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) + FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) Start(ctx context.Context) error @@ -27,7 +27,7 @@ type MinerAPI interface { type SectorAccessor interface { retrievalmarket.SectorAccessor - UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) + UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) } type minerAPI struct { @@ -100,10 +100,10 @@ func (m *minerAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, erro return false, nil } -func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) { +func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error) { err := m.readyMgr.AwaitReady() if err != nil { - return nil, 0, err + return nil, err } // Throttle this path to avoid flooding the storage subsystem. @@ -114,11 +114,11 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off }) if err != nil { - return nil, 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err) + return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err) } if len(pieceInfo.Deals) == 0 { - return nil, 0, xerrors.Errorf("no storage deals found for piece %s", pieceCid) + return nil, xerrors.Errorf("no storage deals found for piece %s", pieceCid) } // prefer an unsealed sector containing the piece if one exists @@ -126,7 +126,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off deal := deal // Throttle this path to avoid flooding the storage subsystem. - var reader io.ReadCloser + var reader mount.Reader err := m.throttle.Do(ctx, func(ctx context.Context) (err error) { isUnsealed, err := m.sa.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) if err != nil { @@ -136,7 +136,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off return nil } // Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing. - reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded()) + reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) return err }) @@ -147,7 +147,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off if reader != nil { // we were able to obtain a reader for an already unsealed piece - return reader, deal.Length.Unpadded(), nil + return reader, nil } } @@ -158,7 +158,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off // block for a long time with the current PoRep // // This path is unthrottled. - reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded()) + reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) if err != nil { lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err) log.Warn(lastErr.Error()) @@ -166,10 +166,10 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, off } // Successfully fetched the deal data so return a reader over the data - return reader, deal.Length.Unpadded(), nil + return reader, nil } - return nil, 0, lastErr + return nil, lastErr } func (m *minerAPI) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) { diff --git a/markets/dagstore/miner_api_test.go b/markets/dagstore/miner_api_test.go index 38c3a4fc3..45cbf2461 100644 --- a/markets/dagstore/miner_api_test.go +++ b/markets/dagstore/miner_api_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" + "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/specs-actors/actors/builtin/paych" @@ -87,7 +88,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) { } // Fetch the piece - r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0) + r, err := api.FetchUnsealedPiece(ctx, cid1) if tc.expectErr { require.Error(t, err) return @@ -159,7 +160,7 @@ func TestThrottle(t *testing.T) { errgrp, ctx := errgroup.WithContext(context.Background()) for i := 0; i < 10; i++ { errgrp.Go(func() error { - r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0) + r, err := api.FetchUnsealedPiece(ctx, cid1) if err == nil { _ = r.Close() } @@ -203,10 +204,10 @@ type mockRPN struct { } func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { - return m.UnsealSectorAt(ctx, sectorID, offset, 0, length) + return m.UnsealSectorAt(ctx, sectorID, offset, length) } -func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { +func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) { atomic.AddInt32(&m.calls, 1) m.lk.RLock() defer m.lk.RUnlock() @@ -215,7 +216,13 @@ func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, if !ok { panic("sector not found") } - return io.NopCloser(bytes.NewBuffer([]byte(data[startOffset:]))), nil + return struct { + io.ReadCloser + io.ReaderAt + io.Seeker + }{ + ReadCloser: io.NopCloser(bytes.NewBuffer([]byte(data[:]))), + }, nil } func (m *mockRPN) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) { diff --git a/markets/dagstore/mocks/mock_lotus_accessor.go b/markets/dagstore/mocks/mock_lotus_accessor.go index e10a1b053..19923cc2a 100644 --- a/markets/dagstore/mocks/mock_lotus_accessor.go +++ b/markets/dagstore/mocks/mock_lotus_accessor.go @@ -6,10 +6,9 @@ package mock_dagstore import ( context "context" - io "io" reflect "reflect" - abi "github.com/filecoin-project/go-state-types/abi" + mount "github.com/filecoin-project/dagstore/mount" gomock "github.com/golang/mock/gomock" cid "github.com/ipfs/go-cid" ) @@ -38,19 +37,18 @@ func (m *MockMinerAPI) EXPECT() *MockMinerAPIMockRecorder { } // FetchUnsealedPiece mocks base method. -func (m *MockMinerAPI) FetchUnsealedPiece(arg0 context.Context, arg1 cid.Cid, arg2 uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) { +func (m *MockMinerAPI) FetchUnsealedPiece(arg0 context.Context, arg1 cid.Cid) (mount.Reader, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchUnsealedPiece", arg0, arg1, arg2) - ret0, _ := ret[0].(io.ReadCloser) - ret1, _ := ret[1].(abi.UnpaddedPieceSize) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + ret := m.ctrl.Call(m, "FetchUnsealedPiece", arg0, arg1) + ret0, _ := ret[0].(mount.Reader) + ret1, _ := ret[1].(error) + return ret0, ret1 } // FetchUnsealedPiece indicates an expected call of FetchUnsealedPiece. -func (mr *MockMinerAPIMockRecorder) FetchUnsealedPiece(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockMinerAPIMockRecorder) FetchUnsealedPiece(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockMinerAPI)(nil).FetchUnsealedPiece), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockMinerAPI)(nil).FetchUnsealedPiece), arg0, arg1) } // GetUnpaddedCARSize mocks base method. diff --git a/markets/dagstore/mount.go b/markets/dagstore/mount.go index e4e6242d7..0ecdc9808 100644 --- a/markets/dagstore/mount.go +++ b/markets/dagstore/mount.go @@ -56,11 +56,7 @@ func (l *LotusMount) Deserialize(u *url.URL) error { } func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) { - return (&pieceReader{ - ctx: ctx, - api: l.API, - pieceCid: l.PieceCid, - }).init() + return l.API.FetchUnsealedPiece(ctx, l.PieceCid) } func (l *LotusMount) Info() mount.Info { diff --git a/markets/dagstore/mount_test.go b/markets/dagstore/mount_test.go index 820058786..d6ea54964 100644 --- a/markets/dagstore/mount_test.go +++ b/markets/dagstore/mount_test.go @@ -2,6 +2,7 @@ package dagstore import ( "context" + "io" "io/ioutil" "net/url" "strings" @@ -12,8 +13,6 @@ import ( "github.com/stretchr/testify/require" "github.com/filecoin-project/dagstore/mount" - "github.com/filecoin-project/go-state-types/abi" - mock_dagstore "github.com/filecoin-project/lotus/markets/dagstore/mocks" ) @@ -30,8 +29,28 @@ func TestLotusMount(t *testing.T) { mockLotusMountAPI := mock_dagstore.NewMockMinerAPI(mockCtrl) mockLotusMountAPI.EXPECT().IsUnsealed(gomock.Any(), cid).Return(true, nil).Times(1) - mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1) - mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1) + + mr1 := struct { + io.ReadCloser + io.ReaderAt + io.Seeker + }{ + ReadCloser: ioutil.NopCloser(strings.NewReader("testing")), + ReaderAt: nil, + Seeker: nil, + } + mr2 := struct { + io.ReadCloser + io.ReaderAt + io.Seeker + }{ + ReadCloser: ioutil.NopCloser(strings.NewReader("testing")), + ReaderAt: nil, + Seeker: nil, + } + + mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(mr1, nil).Times(1) + mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(mr2, nil).Times(1) mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1) mnt, err := NewLotusMount(cid, mockLotusMountAPI) diff --git a/markets/dagstore/wrapper_migration_test.go b/markets/dagstore/wrapper_migration_test.go index 502105499..e46f8779b 100644 --- a/markets/dagstore/wrapper_migration_test.go +++ b/markets/dagstore/wrapper_migration_test.go @@ -6,9 +6,9 @@ import ( "testing" "github.com/stretchr/testify/require" - "golang.org/x/xerrors" "github.com/filecoin-project/dagstore" + "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-fil-markets/retrievalmarket" @@ -127,17 +127,20 @@ type wrappedSA struct { retrievalmarket.SectorAccessor } -func (w *wrappedSA) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { +func (w *wrappedSA) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) { r, err := w.UnsealSector(ctx, sectorID, pieceOffset, length) if err != nil { return nil, err } - if startOffset > 0 { - if _, err := io.CopyN(io.Discard, r, int64(startOffset)); err != nil { - return nil, xerrors.Errorf("discard start off: %w", err) - } - } - return r, err + return struct { + io.ReadCloser + io.Seeker + io.ReaderAt + }{ + ReadCloser: r, + Seeker: nil, + ReaderAt: nil, + }, err } var _ SectorAccessor = &wrappedSA{} diff --git a/markets/dagstore/wrapper_test.go b/markets/dagstore/wrapper_test.go index a4a6215e1..48e01100b 100644 --- a/markets/dagstore/wrapper_test.go +++ b/markets/dagstore/wrapper_test.go @@ -3,7 +3,6 @@ package dagstore import ( "bytes" "context" - "io" "os" "testing" "time" @@ -12,8 +11,6 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/xerrors" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/dagstore" "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/dagstore/shard" @@ -192,7 +189,7 @@ func (m mockLotusMount) Start(ctx context.Context) error { return nil } -func (m mockLotusMount) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) { +func (m mockLotusMount) FetchUnsealedPiece(context.Context, cid.Cid) (mount.Reader, error) { panic("implement me") } diff --git a/markets/sectoraccessor/sectoraccessor.go b/markets/sectoraccessor/sectoraccessor.go index f70aca103..4320e3fb1 100644 --- a/markets/sectoraccessor/sectoraccessor.go +++ b/markets/sectoraccessor/sectoraccessor.go @@ -4,8 +4,16 @@ import ( "context" "io" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" + "github.com/filecoin-project/dagstore/mount" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/filecoin-project/go-state-types/abi" + specstorage "github.com/filecoin-project/specs-storage/storage" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/v1api" "github.com/filecoin-project/lotus/chain/types" @@ -14,14 +22,6 @@ import ( "github.com/filecoin-project/lotus/markets/dagstore" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/storage/sectorblocks" - - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-fil-markets/retrievalmarket" - "github.com/filecoin-project/go-state-types/abi" - specstorage "github.com/filecoin-project/specs-storage/storage" - - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" ) var log = logging.Logger("sectoraccessor") @@ -40,10 +40,10 @@ func NewSectorAccessor(maddr dtypes.MinerAddress, secb sectorblocks.SectorBuilde } func (sa *sectorAccessor) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { - return sa.UnsealSectorAt(ctx, sectorID, pieceOffset, 0, length) + return sa.UnsealSectorAt(ctx, sectorID, pieceOffset, length) } -func (sa *sectorAccessor) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { +func (sa *sectorAccessor) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (mount.Reader, error) { log.Debugf("get sector %d, pieceOffset %d, length %d", sectorID, pieceOffset, length) si, err := sa.sectorsStatus(ctx, sectorID, false) if err != nil { @@ -69,8 +69,8 @@ func (sa *sectorAccessor) UnsealSectorAt(ctx context.Context, sectorID abi.Secto } // Get a reader for the piece, unsealing the piece if necessary - log.Debugf("read piece in sector %d, pieceOffset %d, startOffset %d, length %d from miner %d", sectorID, pieceOffset, startOffset, length, mid) - r, unsealed, err := sa.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(pieceOffset), startOffset, length, si.Ticket.Value, commD) + log.Debugf("read piece in sector %d, pieceOffset %d, length %d from miner %d", sectorID, pieceOffset, length, mid) + r, unsealed, err := sa.pp.ReadPiece(ctx, ref, storiface.UnpaddedByteIndex(pieceOffset), length, si.Ticket.Value, commD) if err != nil { return nil, xerrors.Errorf("failed to unseal piece from sector %d: %w", sectorID, err) } diff --git a/testplans/lotus-soup/go.sum b/testplans/lotus-soup/go.sum index 905c9d924..d27b270f8 100644 --- a/testplans/lotus-soup/go.sum +++ b/testplans/lotus-soup/go.sum @@ -995,8 +995,9 @@ github.com/ipfs/go-ipld-format v0.0.1/go.mod h1:kyJtbkDALmFHv3QR6et67i35QzO3S0dC github.com/ipfs/go-ipld-format v0.0.2/go.mod h1:4B6+FM2u9OJ9zCV+kSbgFAZlOrv1Hqbf0INGQgiKf9k= github.com/ipfs/go-ipld-format v0.2.0 h1:xGlJKkArkmBvowr+GMCX0FEZtkro71K1AwiKnL37mwA= github.com/ipfs/go-ipld-format v0.2.0/go.mod h1:3l3C1uKoadTPbeNfrDi+xMInYKlx2Cvg1BuydPSdzQs= -github.com/ipfs/go-ipld-legacy v0.1.0 h1:wxkkc4k8cnvIGIjPO0waJCe7SHEyFgl+yQdafdjGrpA= github.com/ipfs/go-ipld-legacy v0.1.0/go.mod h1:86f5P/srAmh9GcIcWQR9lfFLZPrIyyXQeVlOWeeWEuI= +github.com/ipfs/go-ipld-legacy v0.1.1 h1:BvD8PEuqwBHLTKqlGFTHSwrwFOMkVESEvwIYwR2cdcc= +github.com/ipfs/go-ipld-legacy v0.1.1/go.mod h1:8AyKFCjgRPsQFf15ZQgDB8Din4DML/fOmKZkkFkrIEg= github.com/ipfs/go-ipns v0.1.2 h1:O/s/0ht+4Jl9+VoxoUo0zaHjnZUS+aBQIKTuzdZ/ucI= github.com/ipfs/go-ipns v0.1.2/go.mod h1:ioQ0j02o6jdIVW+bmi18f4k2gRf0AV3kZ9KeHYHICnQ= github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM= @@ -2307,8 +2308,9 @@ golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5 golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210813211128-0a44fdfbc16e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20210915214749-c084706c2272 h1:3erb+vDS8lU1sxfDHF4/hhWyaXnhIaO+7RgL4fDZORA= golang.org/x/crypto v0.0.0-20210915214749-c084706c2272/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b h1:QAqMVf3pSa6eeTsuklijukjXBlj7Es2QQplab+/RbQ4= +golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -2425,8 +2427,9 @@ golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20210917221730-978cfadd31cf h1:R150MpwJIv1MpS0N/pc+NhTM8ajzvlmxlY5OYsrevXQ= golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -2566,8 +2569,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac h1:oN6lz7iLW/YC7un8pq+9bOLyXrprv2+DKfkJY+2LJJw= -golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211209171907-798191bca915 h1:P+8mCzuEpyszAT6T42q0sxU+eveBAF/cJ2Kp0x6/8+0= +golang.org/x/sys v0.0.0-20211209171907-798191bca915/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf h1:MZ2shdL+ZM/XzY3ZGOnh4Nlpnxz5GSOhOmtHo3iPU6M= @@ -2859,8 +2862,9 @@ k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/kube-openapi v0.0.0-20200316234421-82d701f24f9d/go.mod h1:F+5wygcW0wmRTnM3cOgIqGivxkwSWIWT5YdsDbeAOaU= k8s.io/utils v0.0.0-20191114184206-e782cd3c129f/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew= k8s.io/utils v0.0.0-20200414100711-2df71ebbae66/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= -lukechampine.com/blake3 v1.1.6 h1:H3cROdztr7RCfoaTpGZFQsrqvweFLrqS73j7L7cmR5c= lukechampine.com/blake3 v1.1.6/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA= +lukechampine.com/blake3 v1.1.7 h1:GgRMhmdsuK8+ii6UZFDL8Nb+VyMwadAgcJyfYHxG6n0= +lukechampine.com/blake3 v1.1.7/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA= modernc.org/cc v1.0.0 h1:nPibNuDEx6tvYrUAtvDTTw98rx5juGsa5zuDnKwEEQQ= modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw= modernc.org/fileutil v1.0.0/go.mod h1:JHsWpkrk/CnVV1H/eGlFf85BEpfkrp56ro8nojIq9Q8=