From c58048d16ac0b48ae25e5b0bb194a006623e87a0 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Wed, 19 May 2021 11:36:37 +0530 Subject: [PATCH] address review comments --- extern/sector-storage/piece_provider.go | 8 +++ extern/sector-storage/stores/remote.go | 95 +++++++++++++------------ 2 files changed, 56 insertions(+), 47 deletions(-) diff --git a/extern/sector-storage/piece_provider.go b/extern/sector-storage/piece_provider.go index acc799273..fd54d2166 100644 --- a/extern/sector-storage/piece_provider.go +++ b/extern/sector-storage/piece_provider.go @@ -52,6 +52,9 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage return nil, nil, xerrors.Errorf("acquiring read sector lock: %w", err) } + // Reader returns a reader for an unsealed piece at the given offset in the given sector. + // The returned reader will be nil if none of the workers has an unsealed sector file containing + // the unsealed piece. r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded()) if err != nil { cancel() @@ -85,7 +88,11 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, } var uns bool + if r == nil { + // a nil reader means that none of the workers has an unsealed sector file + // containing the unsealed piece. + // we now need to unseal a sealed sector file for the given sector to read the unsealed piece from it. uns = true commd := &unsealed if unsealed == cid.Undef { @@ -111,6 +118,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, upr, err := fr32.NewUnpadReader(r, size.Padded()) if err != nil { + unlock() return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err) } diff --git a/extern/sector-storage/stores/remote.go b/extern/sector-storage/stores/remote.go index dc556ad92..a09c87761 100644 --- a/extern/sector-storage/stores/remote.go +++ b/extern/sector-storage/stores/remote.go @@ -498,52 +498,8 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a } path := storiface.PathByType(paths, ft) - var rd io.ReadCloser - if path == "" { - // if we don't have the unsealed sector file locally, we'll first lookup the Miner Sector Store Index - // to determine which workers have the unsealed file and then query those workers to know - // if they have the unsealed piece in the unsealed sector file. - si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false) - if err != nil { - return nil, err - } - - if len(si) == 0 { - return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound) - } - - // TODO Why are we sorting in ascending order here -> shouldn't we sort in descending order as higher weight means more likely to have the file ? - sort.Slice(si, func(i, j int) bool { - return si[i].Weight < si[j].Weight - }) - - iloop: - for _, info := range si { - for _, url := range info.URLs { - // checkAllocated makes a JSON RPC query to a remote worker to determine if it has - // unsealed piece in their unsealed sector file. - ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size) - if err != nil { - log.Warnw("check if remote has piece", "url", url, "error", err) - continue - } - if !ok { - continue - } - - // readRemote fetches a reader that we can use to read the unsealed piece from the remote worker. - // It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file. - rd, err = r.readRemote(ctx, url, offset, size) - if err != nil { - log.Warnw("reading from remote", "url", url, "error", err) - continue - } - log.Infof("Read remote %s (+%d,%d)", url, offset, size) - break iloop - } - } - } else { + if path != "" { // if we have the unsealed file locally, return a reader that can be used to read the contents of the // unsealed piece. log.Infof("Read local %s (+%d,%d)", path, offset, size) @@ -576,8 +532,53 @@ func (r *Remote) Reader(ctx context.Context, s storage.SectorRef, offset, size a return pf.Reader(storiface.PaddedByteIndex(offset), size) } - // note: rd can be nil - return rd, nil + // --- We don't have the unsealed sector file locally + + // if we don't have the unsealed sector file locally, we'll first lookup the Miner Sector Store Index + // to determine which workers have the unsealed file and then query those workers to know + // if they have the unsealed piece in the unsealed sector file. + si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false) + if err != nil { + return nil, err + } + + if len(si) == 0 { + return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound) + } + + // TODO Why are we sorting in ascending order here -> shouldn't we sort in descending order as higher weight means more likely to have the file ? + sort.Slice(si, func(i, j int) bool { + return si[i].Weight < si[j].Weight + }) + + for _, info := range si { + for _, url := range info.URLs { + // checkAllocated makes a JSON RPC query to a remote worker to determine if it has + // unsealed piece in their unsealed sector file. + ok, err := r.checkAllocated(ctx, url, s.ProofType, offset, size) + if err != nil { + log.Warnw("check if remote has piece", "url", url, "error", err) + continue + } + if !ok { + continue + } + + // readRemote fetches a reader that we can use to read the unsealed piece from the remote worker. + // It uses a ranged HTTP query to ensure we ONLY read the unsealed piece and not the entire unsealed file. + rd, err := r.readRemote(ctx, url, offset, size) + if err != nil { + log.Warnw("reading from remote", "url", url, "error", err) + continue + } + log.Infof("Read remote %s (+%d,%d)", url, offset, size) + return rd, nil + } + } + + // we couldn't find a unsealed file with the unsealed piece, will return a nil reader. + log.Debugf("returning nil reader, did not find unsealed piece for %+v (+%d,%d)", s, offset, size) + return nil, nil } var _ Store = &Remote{}