address review comments

This commit is contained in:
aarshkshah1992 2021-05-19 11:36:37 +05:30 committed by Dirk McCormick
parent d33d426692
commit c58048d16a
2 changed files with 56 additions and 47 deletions

View File

@ -52,6 +52,9 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage
return nil, nil, xerrors.Errorf("acquiring read sector lock: %w", err) return nil, nil, xerrors.Errorf("acquiring read sector lock: %w", err)
} }
// Reader returns a reader for an unsealed piece at the given offset in the given sector.
// The returned reader will be nil if none of the workers has an unsealed sector file containing
// the unsealed piece.
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded()) r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded())
if err != nil { if err != nil {
cancel() cancel()
@ -85,7 +88,11 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
} }
var uns bool var uns bool
if r == nil { if r == nil {
// a nil reader means that none of the workers has an unsealed sector file
// containing the unsealed piece.
// we now need to unseal a sealed sector file for the given sector to read the unsealed piece from it.
uns = true uns = true
commd := &unsealed commd := &unsealed
if unsealed == cid.Undef { if unsealed == cid.Undef {
@ -111,6 +118,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
upr, err := fr32.NewUnpadReader(r, size.Padded()) upr, err := fr32.NewUnpadReader(r, size.Padded())
if err != nil { if err != nil {
unlock()
return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err) return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err)
} }

View File

@ -498,52 +498,8 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
} }
path := storiface.PathByType(paths, ft) path := storiface.PathByType(paths, ft)
var rd io.ReadCloser
if path == "" { if path != "" {
// if we don't have the unsealed sector file locally, we'll first lookup the Miner Sector Store Index
// to determine which workers have the unsealed file and then query those workers to know
// if they have the unsealed piece in the unsealed sector file.
si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false)
if err != nil {
return nil, err
}
if len(si) == 0 {
return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound)
}
// TODO Why are we sorting in ascending order here -> shouldn't we sort in descending order as higher weight means more likely to have the file ?
sort.Slice(si, func(i, j int) bool {
return si[i].Weight < si[j].Weight
})
iloop:
for _, info := range si {
for _, url := range info.URLs {
// checkAllocated makes a JSON RPC query to a remote worker to determine if it has
// unsealed piece in their unsealed sector file.
ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size)
if err != nil {
log.Warnw("check if remote has piece", "url", url, "error", err)
continue
}
if !ok {
continue
}
// readRemote fetches a reader that we can use to read the unsealed piece from the remote worker.
// It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file.
rd, err = r.readRemote(ctx, url, offset, size)
if err != nil {
log.Warnw("reading from remote", "url", url, "error", err)
continue
}
log.Infof("Read remote %s (+%d,%d)", url, offset, size)
break iloop
}
}
} else {
// if we have the unsealed file locally, return a reader that can be used to read the contents of the // if we have the unsealed file locally, return a reader that can be used to read the contents of the
// unsealed piece. // unsealed piece.
log.Infof("Read local %s (+%d,%d)", path, offset, size) log.Infof("Read local %s (+%d,%d)", path, offset, size)
@ -576,8 +532,53 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a
return pf.Reader(storiface.PaddedByteIndex(offset), size) return pf.Reader(storiface.PaddedByteIndex(offset), size)
} }
// note: rd can be nil // --- We don't have the unsealed sector file locally
return rd, nil
// if we don't have the unsealed sector file locally, we'll first lookup the Miner Sector Store Index
// to determine which workers have the unsealed file and then query those workers to know
// if they have the unsealed piece in the unsealed sector file.
si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false)
if err != nil {
return nil, err
}
if len(si) == 0 {
return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound)
}
// TODO Why are we sorting in ascending order here -> shouldn't we sort in descending order as higher weight means more likely to have the file ?
sort.Slice(si, func(i, j int) bool {
return si[i].Weight < si[j].Weight
})
for _, info := range si {
for _, url := range info.URLs {
// checkAllocated makes a JSON RPC query to a remote worker to determine if it has
// unsealed piece in their unsealed sector file.
ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size)
if err != nil {
log.Warnw("check if remote has piece", "url", url, "error", err)
continue
}
if !ok {
continue
}
// readRemote fetches a reader that we can use to read the unsealed piece from the remote worker.
// It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file.
rd, err := r.readRemote(ctx, url, offset, size)
if err != nil {
log.Warnw("reading from remote", "url", url, "error", err)
continue
}
log.Infof("Read remote %s (+%d,%d)", url, offset, size)
return rd, nil
}
}
// we couldn't find a unsealed file with the unsealed piece, will return a nil reader.
log.Debugf("returning nil reader, did not find unsealed piece for %+v (+%d,%d)", s, offset, size)
return nil, nil
} }
var _ Store = &Remote{} var _ Store = &Remote{}