fix remote store diff
This commit is contained in:
parent
e4e60d7af4
commit
212d0fc264
130
extern/sector-storage/stores/remote.go
vendored
130
extern/sector-storage/stores/remote.go
vendored
@ -419,6 +419,71 @@ func (r *Remote) FsStat(ctx context.Context, id ID) (fsutil.FsStat, error) {
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (r *Remote) checkAllocated(ctx context.Context, url string, spt abi.RegisteredSealProof, offset, size abi.PaddedPieceSize) (bool, error) {
|
||||
url = fmt.Sprintf("%s/%d/allocated/%d/%d", url, spt, offset.Unpadded(), size.Unpadded())
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("request: %w", err)
|
||||
}
|
||||
req.Header = r.auth.Clone()
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("do request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close() // nolint
|
||||
|
||||
switch resp.StatusCode {
|
||||
case http.StatusOK:
|
||||
return true, nil
|
||||
case http.StatusRequestedRangeNotSatisfiable:
|
||||
return false, nil
|
||||
default:
|
||||
return false, xerrors.Errorf("unexpected http response: %d", resp.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) {
|
||||
if len(r.limit) >= cap(r.limit) {
|
||||
log.Infof("Throttling remote read, %d already running", len(r.limit))
|
||||
}
|
||||
|
||||
// TODO: Smarter throttling
|
||||
// * Priority (just going sequentially is still pretty good)
|
||||
// * Per interface
|
||||
// * Aware of remote load
|
||||
select {
|
||||
case r.limit <- struct{}{}:
|
||||
defer func() { <-r.limit }()
|
||||
case <-ctx.Done():
|
||||
return nil, xerrors.Errorf("context error while waiting for fetch limiter: %w", ctx.Err())
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("request: %w", err)
|
||||
}
|
||||
|
||||
if r.auth != nil {
|
||||
req.Header = r.auth.Clone()
|
||||
}
|
||||
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+size-1))
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("do request: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
|
||||
resp.Body.Close() // nolint
|
||||
return nil, xerrors.Errorf("non-200 code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
return resp.Body, nil
|
||||
}
|
||||
|
||||
// CheckIsUnsealed checks if we have an unsealed piece at the given offset in an already unsealed sector file for the given piece
|
||||
// either locally or on any of the workers.
|
||||
// Returns true if we have the unsealed piece, false otherwise.
|
||||
@ -607,69 +672,4 @@ func (r *Remote) Reserve(ctx context.Context, sid storage.SectorRef, ft storifac
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *Remote) checkAllocated(ctx context.Context, url string, spt abi.RegisteredSealProof, offset, size abi.PaddedPieceSize) (bool, error) {
|
||||
url = fmt.Sprintf("%s/%d/allocated/%d/%d", url, spt, offset.Unpadded(), size.Unpadded())
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("request: %w", err)
|
||||
}
|
||||
req.Header = r.auth.Clone()
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("do request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close() // nolint
|
||||
|
||||
switch resp.StatusCode {
|
||||
case http.StatusOK:
|
||||
return true, nil
|
||||
case http.StatusRequestedRangeNotSatisfiable:
|
||||
return false, nil
|
||||
default:
|
||||
return false, xerrors.Errorf("unexpected http response: %d", resp.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Remote) readRemote(ctx context.Context, url string, offset, size abi.PaddedPieceSize) (io.ReadCloser, error) {
|
||||
if len(r.limit) >= cap(r.limit) {
|
||||
log.Infof("Throttling remote read, %d already running", len(r.limit))
|
||||
}
|
||||
|
||||
// TODO: Smarter throttling
|
||||
// * Priority (just going sequentially is still pretty good)
|
||||
// * Per interface
|
||||
// * Aware of remote load
|
||||
select {
|
||||
case r.limit <- struct{}{}:
|
||||
defer func() { <-r.limit }()
|
||||
case <-ctx.Done():
|
||||
return nil, xerrors.Errorf("context error while waiting for fetch limiter: %w", ctx.Err())
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("request: %w", err)
|
||||
}
|
||||
|
||||
if r.auth != nil {
|
||||
req.Header = r.auth.Clone()
|
||||
}
|
||||
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+size-1))
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("do request: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
|
||||
resp.Body.Close() // nolint
|
||||
return nil, xerrors.Errorf("non-200 code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
return resp.Body, nil
|
||||
}
|
||||
|
||||
var _ Store = &Remote{}
|
||||
|
Loading…
Reference in New Issue
Block a user