diff --git a/provider/lpffi/piece_funcs.go b/provider/lpffi/piece_funcs.go index 2ee75151b..986d0fe4d 100644 --- a/provider/lpffi/piece_funcs.go +++ b/provider/lpffi/piece_funcs.go @@ -65,3 +65,7 @@ func (sb *SealCalls) WritePiece(ctx context.Context, pieceID storiface.PieceNumb removeTemp = false return nil } + +func (sb *SealCalls) PieceReader(ctx context.Context, id storiface.PieceNumber) (io.ReadCloser, error) { + return sb.sectors.storage.ReaderSeq(ctx, id.Ref(), storiface.FTPiece) +} diff --git a/provider/lpffi/sdr_funcs.go b/provider/lpffi/sdr_funcs.go index 05a7bbd80..77019e4d2 100644 --- a/provider/lpffi/sdr_funcs.go +++ b/provider/lpffi/sdr_funcs.go @@ -42,7 +42,7 @@ type SealCalls struct { externCalls ExternalSealer*/ } -func NewSealCalls(st paths.Store, ls *paths.Local, si paths.SectorIndex) *SealCalls { +func NewSealCalls(st *paths.Remote, ls *paths.Local, si paths.SectorIndex) *SealCalls { return &SealCalls{ sectors: &storageProvider{ storage: st, @@ -54,7 +54,7 @@ func NewSealCalls(st paths.Store, ls *paths.Local, si paths.SectorIndex) *SealCa } type storageProvider struct { - storage paths.Store + storage *paths.Remote localStore *paths.Local sindex paths.SectorIndex storageReservations *xsync.MapOf[harmonytask.TaskID, *StorageReservation] diff --git a/provider/lpseal/task_trees.go b/provider/lpseal/task_trees.go index a632c5e87..31775d290 100644 --- a/provider/lpseal/task_trees.go +++ b/provider/lpseal/task_trees.go @@ -4,6 +4,8 @@ import ( "context" "io" "net/http" + "net/url" + "strconv" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -88,6 +90,15 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done var dataReader io.Reader var unpaddedData bool + var closers []io.Closer + defer func() { + for _, c := range closers { + if err := c.Close(); err != nil { + log.Errorw("error closing piece reader", "error", err) + } + } + }() + if len(pieces) > 0 { pieceInfos := make([]abi.PieceInfo, len(pieces)) pieceReaders := make([]io.Reader, len(pieces)) @@ -106,10 +117,49 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done // make pieceReader if p.DataUrl != nil { - pieceReaders[i], _ = padreader.New(&UrlPieceReader{ - Url: *p.DataUrl, - RawSize: *p.DataRawSize, - }, uint64(*p.DataRawSize)) + dataUrl := *p.DataUrl + + goUrl, err := url.Parse(dataUrl) + if err != nil { + return false, xerrors.Errorf("parsing data URL: %w", err) + } + + if goUrl.Scheme == "pieceref" { + // url is to a piece reference + + refNum, err := strconv.ParseInt(goUrl.Opaque, 10, 64) + if err != nil { + return false, xerrors.Errorf("parsing piece reference number: %w", err) + } + + // get pieceID + var pieceID []struct { + PieceID storiface.PieceNumber `db:"piece_id"` + } + err = t.db.Select(ctx, &pieceID, `SELECT piece_id FROM parked_piece_refs WHERE ref_id = $1`, refNum) + if err != nil { + return false, xerrors.Errorf("getting pieceID: %w", err) + } + + if len(pieceID) != 1 { + return false, xerrors.Errorf("expected 1 pieceID, got %d", len(pieceID)) + } + + pr, err := t.sc.PieceReader(ctx, pieceID[0].PieceID) + if err != nil { + return false, xerrors.Errorf("getting piece reader: %w", err) + } + + closers = append(closers, pr) + + pieceReaders[i], _ = padreader.New(pr, uint64(*p.DataRawSize)) + } else { + pieceReaders[i], _ = padreader.New(&UrlPieceReader{ + Url: dataUrl, + RawSize: *p.DataRawSize, + }, uint64(*p.DataRawSize)) + } + } else { // padding piece (w/o fr32 padding, added in TreeD) pieceReaders[i] = nullreader.NewNullReader(abi.PaddedPieceSize(p.PieceSize).Unpadded()) } diff --git a/storage/paths/remote.go b/storage/paths/remote.go index 882c98c4a..dfceaeace 100644 --- a/storage/paths/remote.go +++ b/storage/paths/remote.go @@ -747,6 +747,46 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size return nil, nil } +func (r *Remote) ReaderSeq(ctx context.Context, s storiface.SectorRef, ft storiface.SectorFileType) (io.ReadCloser, error) { + paths, _, err := r.local.AcquireSector(ctx, s, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) + if err != nil { + return nil, xerrors.Errorf("acquire local: %w", err) + } + + path := storiface.PathByType(paths, ft) + if path != "" { + return os.Open(path) + } + + si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false) + if err != nil { + log.Debugf("Reader, did not find file on any of the workers %s (%s)", path, ft.String()) + return nil, err + } + + if len(si) == 0 { + return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound) + } + + sort.Slice(si, func(i, j int) bool { + return si[i].Weight > si[j].Weight + }) + + for _, info := range si { + for _, url := range info.URLs { + rd, err := r.readRemote(ctx, url, 0, 0) + if err != nil { + log.Warnw("reading from remote", "url", url, "error", err) + continue + } + + return rd, err + } + } + + return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound) +} + func (r *Remote) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) { log.Warnf("reserve called on remote store, sectorID: %v", sid.ID) return func() {