address review comments
This commit is contained in:
parent
c853350bdf
commit
db5c88196d
8
extern/sector-storage/piece_provider.go
vendored
8
extern/sector-storage/piece_provider.go
vendored
@ -52,6 +52,9 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage
|
|||||||
return nil, nil, xerrors.Errorf("acquiring read sector lock: %w", err)
|
return nil, nil, xerrors.Errorf("acquiring read sector lock: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reader returns a reader for an unsealed piece at the given offset in the given sector.
|
||||||
|
// The returned reader will be nil if none of the workers has an unsealed sector file containing
|
||||||
|
// the unsealed piece.
|
||||||
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded())
|
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel()
|
cancel()
|
||||||
@ -85,7 +88,11 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
|
|||||||
}
|
}
|
||||||
|
|
||||||
var uns bool
|
var uns bool
|
||||||
|
|
||||||
if r == nil {
|
if r == nil {
|
||||||
|
// a nil reader means that none of the workers has an unsealed sector file
|
||||||
|
// containing the unsealed piece.
|
||||||
|
// we now need to unseal a sealed sector file for the given sector to read the unsealed piece from it.
|
||||||
uns = true
|
uns = true
|
||||||
commd := &unsealed
|
commd := &unsealed
|
||||||
if unsealed == cid.Undef {
|
if unsealed == cid.Undef {
|
||||||
@ -111,6 +118,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
|
|||||||
|
|
||||||
upr, err := fr32.NewUnpadReader(r, size.Padded())
|
upr, err := fr32.NewUnpadReader(r, size.Padded())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
unlock()
|
||||||
return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err)
|
return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
95
extern/sector-storage/stores/remote.go
vendored
95
extern/sector-storage/stores/remote.go
vendored
@ -498,52 +498,8 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
|
|||||||
}
|
}
|
||||||
|
|
||||||
path := storiface.PathByType(paths, ft)
|
path := storiface.PathByType(paths, ft)
|
||||||
var rd io.ReadCloser
|
|
||||||
|
|
||||||
if path == "" {
|
if path != "" {
|
||||||
// if we don't have the unsealed sector file locally, we'll first lookup the Miner Sector Store Index
|
|
||||||
// to determine which workers have the unsealed file and then query those workers to know
|
|
||||||
// if they have the unsealed piece in the unsealed sector file.
|
|
||||||
si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(si) == 0 {
|
|
||||||
return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO Why are we sorting in ascending order here -> shouldn't we sort in descending order as higher weight means more likely to have the file ?
|
|
||||||
sort.Slice(si, func(i, j int) bool {
|
|
||||||
return si[i].Weight < si[j].Weight
|
|
||||||
})
|
|
||||||
|
|
||||||
iloop:
|
|
||||||
for _, info := range si {
|
|
||||||
for _, url := range info.URLs {
|
|
||||||
// checkAllocated makes a JSON RPC query to a remote worker to determine if it has
|
|
||||||
// unsealed piece in their unsealed sector file.
|
|
||||||
ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnw("check if remote has piece", "url", url, "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// readRemote fetches a reader that we can use to read the unsealed piece from the remote worker.
|
|
||||||
// It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file.
|
|
||||||
rd, err = r.readRemote(ctx, url, offset, size)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnw("reading from remote", "url", url, "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
log.Infof("Read remote %s (+%d,%d)", url, offset, size)
|
|
||||||
break iloop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// if we have the unsealed file locally, return a reader that can be used to read the contents of the
|
// if we have the unsealed file locally, return a reader that can be used to read the contents of the
|
||||||
// unsealed piece.
|
// unsealed piece.
|
||||||
log.Infof("Read local %s (+%d,%d)", path, offset, size)
|
log.Infof("Read local %s (+%d,%d)", path, offset, size)
|
||||||
@ -576,8 +532,53 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
|
|||||||
return pf.Reader(storiface.PaddedByteIndex(offset), size)
|
return pf.Reader(storiface.PaddedByteIndex(offset), size)
|
||||||
}
|
}
|
||||||
|
|
||||||
// note: rd can be nil
|
// --- We don't have the unsealed sector file locally
|
||||||
return rd, nil
|
|
||||||
|
// if we don't have the unsealed sector file locally, we'll first lookup the Miner Sector Store Index
|
||||||
|
// to determine which workers have the unsealed file and then query those workers to know
|
||||||
|
// if they have the unsealed piece in the unsealed sector file.
|
||||||
|
si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(si) == 0 {
|
||||||
|
return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO Why are we sorting in ascending order here -> shouldn't we sort in descending order as higher weight means more likely to have the file ?
|
||||||
|
sort.Slice(si, func(i, j int) bool {
|
||||||
|
return si[i].Weight < si[j].Weight
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, info := range si {
|
||||||
|
for _, url := range info.URLs {
|
||||||
|
// checkAllocated makes a JSON RPC query to a remote worker to determine if it has
|
||||||
|
// unsealed piece in their unsealed sector file.
|
||||||
|
ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnw("check if remote has piece", "url", url, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// readRemote fetches a reader that we can use to read the unsealed piece from the remote worker.
|
||||||
|
// It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file.
|
||||||
|
rd, err := r.readRemote(ctx, url, offset, size)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnw("reading from remote", "url", url, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.Infof("Read remote %s (+%d,%d)", url, offset, size)
|
||||||
|
return rd, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// we couldn't find a unsealed file with the unsealed piece, will return a nil reader.
|
||||||
|
log.Debugf("returning nil reader, did not find unsealed piece for %+v (+%d,%d)", s, offset, size)
|
||||||
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Store = &Remote{}
|
var _ Store = &Remote{}
|
||||||
|
Loading…
Reference in New Issue
Block a user