v1.27.0-a #10

Closed
jonathanface wants to merge 473 commits from v1.27.0-a into master
4 changed files with 100 additions and 6 deletions
Showing only changes of commit b90cf19604 - Show all commits

View File

@ -65,3 +65,7 @@ func (sb *SealCalls) WritePiece(ctx context.Context, pieceID storiface.PieceNumb
removeTemp = false removeTemp = false
return nil return nil
} }
func (sb *SealCalls) PieceReader(ctx context.Context, id storiface.PieceNumber) (io.ReadCloser, error) {
return sb.sectors.storage.ReaderSeq(ctx, id.Ref(), storiface.FTPiece)
}

View File

@ -42,7 +42,7 @@ type SealCalls struct {
externCalls ExternalSealer*/ externCalls ExternalSealer*/
} }
func NewSealCalls(st paths.Store, ls *paths.Local, si paths.SectorIndex) *SealCalls { func NewSealCalls(st *paths.Remote, ls *paths.Local, si paths.SectorIndex) *SealCalls {
return &SealCalls{ return &SealCalls{
sectors: &storageProvider{ sectors: &storageProvider{
storage: st, storage: st,
@ -54,7 +54,7 @@ func NewSealCalls(st paths.Store, ls *paths.Local, si paths.SectorIndex) *SealCa
} }
type storageProvider struct { type storageProvider struct {
storage paths.Store storage *paths.Remote
localStore *paths.Local localStore *paths.Local
sindex paths.SectorIndex sindex paths.SectorIndex
storageReservations *xsync.MapOf[harmonytask.TaskID, *StorageReservation] storageReservations *xsync.MapOf[harmonytask.TaskID, *StorageReservation]

View File

@ -4,6 +4,8 @@ import (
"context" "context"
"io" "io"
"net/http" "net/http"
"net/url"
"strconv"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -88,6 +90,15 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
var dataReader io.Reader var dataReader io.Reader
var unpaddedData bool var unpaddedData bool
var closers []io.Closer
defer func() {
for _, c := range closers {
if err := c.Close(); err != nil {
log.Errorw("error closing piece reader", "error", err)
}
}
}()
if len(pieces) > 0 { if len(pieces) > 0 {
pieceInfos := make([]abi.PieceInfo, len(pieces)) pieceInfos := make([]abi.PieceInfo, len(pieces))
pieceReaders := make([]io.Reader, len(pieces)) pieceReaders := make([]io.Reader, len(pieces))
@ -106,10 +117,49 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
// make pieceReader // make pieceReader
if p.DataUrl != nil { if p.DataUrl != nil {
pieceReaders[i], _ = padreader.New(&UrlPieceReader{ dataUrl := *p.DataUrl
Url: *p.DataUrl,
RawSize: *p.DataRawSize, goUrl, err := url.Parse(dataUrl)
}, uint64(*p.DataRawSize)) if err != nil {
return false, xerrors.Errorf("parsing data URL: %w", err)
}
if goUrl.Scheme == "pieceref" {
// url is to a piece reference
refNum, err := strconv.ParseInt(goUrl.Opaque, 10, 64)
if err != nil {
return false, xerrors.Errorf("parsing piece reference number: %w", err)
}
// get pieceID
var pieceID []struct {
PieceID storiface.PieceNumber `db:"piece_id"`
}
err = t.db.Select(ctx, &pieceID, `SELECT piece_id FROM parked_piece_refs WHERE ref_id = $1`, refNum)
if err != nil {
return false, xerrors.Errorf("getting pieceID: %w", err)
}
if len(pieceID) != 1 {
return false, xerrors.Errorf("expected 1 pieceID, got %d", len(pieceID))
}
pr, err := t.sc.PieceReader(ctx, pieceID[0].PieceID)
if err != nil {
return false, xerrors.Errorf("getting piece reader: %w", err)
}
closers = append(closers, pr)
pieceReaders[i], _ = padreader.New(pr, uint64(*p.DataRawSize))
} else {
pieceReaders[i], _ = padreader.New(&UrlPieceReader{
Url: dataUrl,
RawSize: *p.DataRawSize,
}, uint64(*p.DataRawSize))
}
} else { // padding piece (w/o fr32 padding, added in TreeD) } else { // padding piece (w/o fr32 padding, added in TreeD)
pieceReaders[i] = nullreader.NewNullReader(abi.PaddedPieceSize(p.PieceSize).Unpadded()) pieceReaders[i] = nullreader.NewNullReader(abi.PaddedPieceSize(p.PieceSize).Unpadded())
} }

View File

@ -747,6 +747,46 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size
return nil, nil return nil, nil
} }
func (r *Remote) ReaderSeq(ctx context.Context, s storiface.SectorRef, ft storiface.SectorFileType) (io.ReadCloser, error) {
paths, _, err := r.local.AcquireSector(ctx, s, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
return nil, xerrors.Errorf("acquire local: %w", err)
}
path := storiface.PathByType(paths, ft)
if path != "" {
return os.Open(path)
}
si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false)
if err != nil {
log.Debugf("Reader, did not find file on any of the workers %s (%s)", path, ft.String())
return nil, err
}
if len(si) == 0 {
return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound)
}
sort.Slice(si, func(i, j int) bool {
return si[i].Weight > si[j].Weight
})
for _, info := range si {
for _, url := range info.URLs {
rd, err := r.readRemote(ctx, url, 0, 0)
if err != nil {
log.Warnw("reading from remote", "url", url, "error", err)
continue
}
return rd, err
}
}
return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound)
}
func (r *Remote) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) { func (r *Remote) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) {
log.Warnf("reserve called on remote store, sectorID: %v", sid.ID) log.Warnf("reserve called on remote store, sectorID: %v", sid.ID)
return func() { return func() {