diff --git a/extern/sector-storage/stores/remote.go b/extern/sector-storage/stores/remote.go index e88843ccb..744d16581 100644 --- a/extern/sector-storage/stores/remote.go +++ b/extern/sector-storage/stores/remote.go @@ -419,6 +419,71 @@ func (r *Remote) FsStat(ctx context.Context, id ID) (fsutil.FsStat, error) { return out, nil } +func (r *Remote) checkAllocated(ctx context.Context, url string, spt abi.RegisteredSealProof, offset, size abi.PaddedPieceSize) (bool, error) { + url = fmt.Sprintf("%s/%d/allocated/%d/%d", url, spt, offset.Unpadded(), size.Unpadded()) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return false, xerrors.Errorf("request: %w", err) + } + req.Header = r.auth.Clone() + req = req.WithContext(ctx) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return false, xerrors.Errorf("do request: %w", err) + } + defer resp.Body.Close() // nolint + + switch resp.StatusCode { + case http.StatusOK: + return true, nil + case http.StatusRequestedRangeNotSatisfiable: + return false, nil + default: + return false, xerrors.Errorf("unexpected http response: %d", resp.StatusCode) + } +} + +func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) { + if len(r.limit) >= cap(r.limit) { + log.Infof("Throttling remote read, %d already running", len(r.limit)) + } + + // TODO: Smarter throttling + // * Priority (just going sequentially is still pretty good) + // * Per interface + // * Aware of remote load + select { + case r.limit <- struct{}{}: + defer func() { <-r.limit }() + case <-ctx.Done(): + return nil, xerrors.Errorf("context error while waiting for fetch limiter: %w", ctx.Err()) + } + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, xerrors.Errorf("request: %w", err) + } + + if r.auth != nil { + req.Header = r.auth.Clone() + } + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)) + req = req.WithContext(ctx) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, xerrors.Errorf("do request: %w", err) + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { + resp.Body.Close() // nolint + return nil, xerrors.Errorf("non-200 code: %d", resp.StatusCode) + } + + return resp.Body, nil +} + // CheckIsUnsealed checks if we have an unsealed piece at the given offset in an already unsealed sector file for the given piece // either locally or on any of the workers. // Returns true if we have the unsealed piece, false otherwise. @@ -607,69 +672,4 @@ func (r *Remote) Reserve(ctx context.Context, sid storage.SectorRef, ft storifac }, nil } -func (r *Remote) checkAllocated(ctx context.Context, url string, spt abi.RegisteredSealProof, offset, size abi.PaddedPieceSize) (bool, error) { - url = fmt.Sprintf("%s/%d/allocated/%d/%d", url, spt, offset.Unpadded(), size.Unpadded()) - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return false, xerrors.Errorf("request: %w", err) - } - req.Header = r.auth.Clone() - req = req.WithContext(ctx) - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return false, xerrors.Errorf("do request: %w", err) - } - defer resp.Body.Close() // nolint - - switch resp.StatusCode { - case http.StatusOK: - return true, nil - case http.StatusRequestedRangeNotSatisfiable: - return false, nil - default: - return false, xerrors.Errorf("unexpected http response: %d", resp.StatusCode) - } -} - -func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) { - if len(r.limit) >= cap(r.limit) { - log.Infof("Throttling remote read, %d already running", len(r.limit)) - } - - // TODO: Smarter throttling - // * Priority (just going sequentially is still pretty good) - // * Per interface - // * Aware of remote load - select { - case r.limit <- struct{}{}: - defer func() { <-r.limit }() - case <-ctx.Done(): - return nil, xerrors.Errorf("context error while waiting for fetch limiter: %w", ctx.Err()) - } - - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return nil, xerrors.Errorf("request: %w", err) - } - - if r.auth != nil { - req.Header = r.auth.Clone() - } - req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)) - req = req.WithContext(ctx) - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, xerrors.Errorf("do request: %w", err) - } - - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { - resp.Body.Close() // nolint - return nil, xerrors.Errorf("non-200 code: %d", resp.StatusCode) - } - - return resp.Body, nil -} - var _ Store = &Remote{}